返回 导航

Spark

hangge.com

Spark - Structured Streaming使用详解7(水印Watermark处理延迟数据)

作者:hangge | 2024-01-17 08:43
    在数据分析系统中,Structured Streaming 可以持续的按照 event-time 聚合数据,然而在此过程中并不能保证数据按照时间的先后依次到达。例如:当前接收的某一条数据的 event-time 可能远远早于之前已经处理过的 event-time。在发生这种情况时,往往需要结合业务需求对延迟数据进行过滤。
    为了实现这个需求,从 Spark 2.1 起,引入了 watermark(水印),使用引擎可以自动地跟踪当前的事件时间,并据此尝试删除旧状态。

九、水印 Watermark 处理延迟数据

1,基本介绍

(1)Watermark 是一种时间戳概念,它表示事件时间(event time)在流式数据中的进度。我们通过指定 event-time 列和预估事件的延迟时间上限来定义一个查询的 watermark。针对一个以时间 T 结束的窗口, 引擎会保留状态和允许延迟时间直到max event time seen by the engine - late threshold) > T 。换句话说,延迟时间在上限内的被聚合,延迟时间超出上限的开始被丢弃。
注意:
  • watermask 会在处理当前批次数据时更新,并且会在处理下一个批次数据时生效使用。但如果节点发生故障,则可能延迟若干批次生效。
  • 初始化 wartmark0
  • watermark 只能逐渐增加,不能减少

(2)Structured Streaming 引入 Watermark 机制,主要是为了解决以下两个问题:
  • 处理聚合中的延迟数据
  • 减少内存中维护的聚合状态

(3)在不同输出模式,Watermark 会产生不同的影响:
  • 在输出模式是 append 时,必须设置 watermask 才能使用聚合操作。其实,watermask 定义了 append 模式中何时输出聚合结果(状态),并清理过期状态。
  • 在输出模式是 update 时,watermask 主要用于过滤过期数据并及时清理过期状态。
  • 不能是 complete 模式,因为 complete 的时候(必须有聚合),要求每次输出所有的聚合结果。我们使用 watermark 的目的是丢弃一些过时聚合数据,所以 complete 模式使用 wartermark 无效也无意义。

(4)可以通过 withWatermark() 来定义 watermarkwatermark 计算:watermark = MaxEventTime - Threshhod。注意:
  • withWatermark 必须使用与聚合操作中的时间戳列是同一列 .df.withWatermark("time", "1 min").groupBy("time2").count() 无效
  • withWatermark 必须在聚合之前调用 .df.groupBy("time").count().withWatermark("time", "1 min") 无效

2,update 模式下使用 watermark

(1)在 update 模式下,仅输出与之前批次的结果相比,涉及更新或新增的数据。下面是一个 update 模式下,使用水印的 word count 样例代码:
import org.apache.spark.sql.functions.window
import java.sql.Timestamp
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("Hello")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 创建一个流式DataFrame,这里从socket中读取数据
    val lines: DataFrame = spark.readStream
      .format("socket") // 设置数据源
      .option("host", "localhost")
      .option("port", 9999)
      .load()

    // 解析原始输入数据,假设原始数据格式为"单词1 单词2 单词3,事件时间"
    val wordsWithTimestamp: DataFrame = lines.as[String]
      .map(line => {
        val parts = line.split(",")
        val words = parts(0).split(" ")
        val timestamp = Timestamp.valueOf(parts(1).trim())
        (words, timestamp)
      })
      .flatMap { case (words, timestamp) =>
        words.map(word => (word, timestamp))
      }
      .toDF("word", "timestamp")

    // 按照窗口和单词分组, 并且计算每组的单词的个数
    val wordCounts: Dataset[Row] = wordsWithTimestamp
      .withWatermark("timestamp", "2 minutes") // 添加水印,参数(event-time列名,延迟时间上限)
      .groupBy(
        // 调用 window 函数, 返回的是一个 Column,参数(df中表示时间戳的列、窗口长度、滑动步长)
        window($"timestamp", "15 minutes", "5 minutes"), $"word"
      )
      .count()  // 计数

    // 启动查询, 把结果打印到控制台
    val query = wordCounts.writeStream
      .outputMode("update")
      .format("console")
      .option("truncate", "false")  // 不截断.为了在控制台能看到完整信息, 最好设置为 false
      .start()

    // 等待应用程序终止
    query.awaitTermination()

    //关闭 Spark
    spark.stop()
  }
}

(2)测试一下,首先我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(3)首先我们输入如下数据:
a,2023-09-05 10:31:00
  • 然后根据当前批次中最大的 event-time,计算出来下次使用的 watermark。本批次只有一个数据 (10:31),所以:watermark = 10:31 - 2min = 10:29

(4)接着我们输入如下数据:
a,2023-09-05 10:43:00
  • 其中 count2 的表示更新,count1 的表示新增。没有变化的就没有显示,但是内存中仍然保存着。也就是说第一批的如下数据仍在内存保存着:
|{2023-09-05 10:20:00, 2023-09-05 10:35:00}|a   |1    |
|{2023-09-05 10:25:00, 2023-09-05 10:40:00}|a   |1    |

(5)接着我们输入如下数据,相当于一条延迟数据.:
a,2023-09-05 10:39:00
|{2023-09-05 10:20:00, 2023-09-05 10:35:00}|a   |1    |
|{2023-09-05 10:25:00, 2023-09-05 10:40:00}|a   |1    |

3,append 模式下使用 wartermark

(1)在 append 模式下,仅输出新增的数据,且输出后的数据无法变更。下面是一个 append 模式下,使用水印的 word count 样例代码:
import org.apache.spark.sql.functions.window
import java.sql.Timestamp
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("Hello")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 创建一个流式DataFrame,这里从socket中读取数据
    val lines: DataFrame = spark.readStream
      .format("socket") // 设置数据源
      .option("host", "localhost")
      .option("port", 9999)
      .load()

    // 解析原始输入数据,假设原始数据格式为"单词1 单词2 单词3,事件时间"
    val wordsWithTimestamp: DataFrame = lines.as[String]
      .map(line => {
        val parts = line.split(",")
        val words = parts(0).split(" ")
        val timestamp = Timestamp.valueOf(parts(1).trim())
        (words, timestamp)
      })
      .flatMap { case (words, timestamp) =>
        words.map(word => (word, timestamp))
      }
      .toDF("word", "timestamp")

    // 按照窗口和单词分组, 并且计算每组的单词的个数
    val wordCounts: Dataset[Row] = wordsWithTimestamp
      .withWatermark("timestamp", "2 minutes") // 添加watermark, 参数(event-time列名,延迟时间的上限)
      .groupBy(
        // 调用 window 函数, 返回的是一个 Column,参数(df中表示时间戳的列、窗口长度、滑动步长)
        window($"timestamp", "15 minutes", "5 minutes"), $"word"
      )
      .count()  // 计数

    // 启动查询, 把结果打印到控制台
    val query = wordCounts.writeStream
      .outputMode("append")
      .format("console")
      .option("truncate", "false")  // 不截断.为了在控制台能看到完整信息, 最好设置为 false
      .start()

    // 等待应用程序终止
    query.awaitTermination()

    //关闭 Spark
    spark.stop()
  }
}

(2)测试一下,首先我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(3)首先我们输入如下数据:
a,2023-09-05 10:31:00
  • 然后根据当前批次中最大的 event-time,计算出来下次使用的 watermark。本批次只有一个数据 (10:31),所以:watermark = 10:31 - 2min = 10:29

(4)接着我们输入如下数据:
a,2023-09-05 10:43:00

(5)接着我们输入如下数据,相当于一条延迟数据.:
a,2023-09-05 10:39:00
评论

全部评论(0)

回到顶部