返回 导航

Spark

hangge.com

Spark - Spark Streaming使用详解8(DStream有状态转换操作3:滑动窗口转换操作)

作者:hangge | 2023-12-17 11:06
    前面的文章中,我介绍了 Spark Streaming 的有状态转换操作,如 updateStateByKeymapWithState。本文将介绍滑动窗口转换操作。滑动窗口允许我们在一个可调整大小的时间窗口内对数据进行操作和分析,这在实时数据流处理中非常实用。

八、DStream 有状态转换操作3:滑动窗口转换操作

1,基本介绍

(1)滑动窗口操作是一种允许我们对一段时间内的数据执行操作的机制,窗口会根据时间滑动,以便持续分析流数据。
(2)所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。 
  • 窗口时长:计算内容的时间范围
  • 滑动步长:隔多久触发一次计算。
注意:
  • 窗口时长”和“滑动步长”都必须为采集周期大小的整数倍。
  • 在滑动窗口中,数据可能会被多次计算,需要小心处理,避免重复计算。

2,window(windowLength, slideInterval)

(1)该方法基于源 DStream 中的窗口批次进行计算,返回一个新的 DStream,其中的元素是滑动窗口内的批次数据。在每个窗口内,我们可以对批次数据进行任意操作,例如转换、聚合等。
(2)下面示例中我们使用了 window 函数来创建一个滑动窗口,窗口长度为 10 秒,滑动间隔为 5 秒。因此,我们会对连续 10 秒内的数据进行单词频度计算,并且每 5 秒滑动一次。
object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")

    // 初始化 StreamingContext,设置微批处理的时间间隔为 5 秒
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    // 设置检查点以保存状态
    ssc.checkpoint("./checkpoint")

    // 通过监控端口创建 DStream,读进来的数据为一行行
    val inputDStream: DStream[String] = ssc.socketTextStream("localhost", 9999)

    // 对 DStream 进行转换操作,将每一行数据做切分,形成一个个单词
    val wordStreams = inputDStream.flatMap(_.split(" "))

    // 将单词映射成元组(word,1)
    val wordAndOneStreams = wordStreams.map((_, 1))

    // 使用window函数进行滑动窗口操作
    val windowedCounts = wordAndOneStreams.window(Seconds(10), Seconds(5)).reduceByKey(_ + _)

    // 打印结果
    windowedCounts.print()

    // 启动 StreamingContext
    ssc.start()

    // 等待应用程序终止(遇错停止,否则不断工作)
    ssc.awaitTermination()
  }
}

(3)我们每隔 5 秒,依次输入数据 abc,应用程序这边控制台数据信息如下:

3,countByWindow(windowLength, slideInterval)

(1)该方法返回一个滑动窗口计数流中的元素个数。对于每个窗口,它会返回窗口内元素的数量。
// 通过监控端口创建 DStream,读进来的数据为一行行
val inputDStream: DStream[String] = ssc.socketTextStream("localhost", 9999)

// 对 DStream 进行转换操作,将每一行数据做切分,形成一个个单词
val wordStreams = inputDStream.flatMap(_.split(" "))

// 使用window函数进行滑动窗口操作
val countStream = wordStreams.countByWindow(Seconds(10), Seconds(5))

// 打印结果
countStream.print()

(2)我们每隔 5 秒,依次输入数据 aa b 以及 a b c,应用程序这边控制台数据信息如下:

4,countByValueAndWindow(windowLength, slideInterval)

(1)该方法用于在滑动窗口内对元素进行计数,然后它会返回一个 DStream,其中的每个元素都是 (element, count) 的键值对。
// 通过监控端口创建 DStream,读进来的数据为一行行
val inputDStream: DStream[String] = ssc.socketTextStream("localhost", 9999)

// 对 DStream 进行转换操作,将每一行数据做切分,形成一个个单词
val wordStreams = inputDStream.flatMap(_.split(" "))

// 使用window函数进行滑动窗口操作
val countByValueStream = wordStreams.countByValueAndWindow(Seconds(10), Seconds(5))

// 打印结果
countByValueStream.print()

(2)我们每隔5秒,依次输入数据 aa b 以及 a b c,应用程序这边控制台数据信息如下:

5,reduceByWindow(func, windowLength, slideInterval)

(1)该方法通过使用自定义的函数 func 来整合滑动窗口中的批次数据,创建一个新的单元素流。这个函数会对窗口内的数据应用 func 函数,以生成一个结果。
// 通过监控端口创建 DStream,读进来的数据为一行行
val inputDStream: DStream[String] = ssc.socketTextStream("localhost", 9999)

// 对 DStream 进行转换操作,将每一行数据做切分,形成一个个数字
val numStreams = inputDStream.flatMap(_.split(" ")).map(_.toInt)

// 使用window函数进行滑动窗口操作
val reducedStream = numStreams.reduceByWindow((a, b) => a + b, Seconds(10), Seconds(5))

// 打印结果
reducedStream.print()

(2)我们输入数据 1 2 后等待 5 秒再次输入 1 2,应用程序这边控制台数据信息如下:

6,reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])

(1)该方法是在 (K, V) 键值对 DStream 上使用的,它会返回一个新的 (K, V) 键值对 DStream,其中的每个窗口都通过使用 reduce 函数来整合每个键的值。func 函数将被应用于窗口内的批次数据。
// 通过监控端口创建 DStream,读进来的数据为一行行
val inputDStream: DStream[String] = ssc.socketTextStream("localhost", 9999)

// 对 DStream 进行转换操作,将每一行数据做切分,形成一个个单词
val wordStreams = inputDStream.flatMap(_.split(" "))

// 将单词映射成元组(word,1)
val wordPairsStreams = wordStreams.map((_, 1))

// 使用window函数进行滑动窗口操作
val reducedStream = wordPairsStreams.reduceByKeyAndWindow((a, b) => a + b, Seconds(10), Seconds(5))

// 打印结果
reducedStream.print()

(2)我们每隔 5 秒,依次输入数据 a bb c 以及 c d,应用程序这边控制台数据信息如下:

7,reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

(1)该方法是 reduceByKeyAndWindow 方法的变体,它允许我们通过使用先前窗口的 reduce 值来递增计算当前窗口的 reduce 值。这可以在数据增量计算中很有用。
// 通过监控端口创建 DStream,读进来的数据为一行行
val inputDStream: DStream[String] = ssc.socketTextStream("localhost", 9999)

// 对 DStream 进行转换操作,将每一行数据做切分,形成一个个单词
val wordStreams = inputDStream.flatMap(_.split(" "))

// 将单词映射成元组(word,1)
val wordPairsStreams = wordStreams.map((_, 1))

// 使用window函数进行滑动窗口操作
val reducedStream = wordPairsStreams.reduceByKeyAndWindow(
  (a, b) => a + b,  // 增量计算函数
  (a, b) => a - b,  // 反向增量计算函数,
  Seconds(10), Seconds(5))

// 打印结果
reducedStream.print()

(2)我们每隔 5 秒,依次输入数据 a bb c 以及 c d,应用程序这边控制台数据信息如下:
提示:虽然该代码结果同前一个代码的结果差不多,但当窗口时长很大时这种方式的效率会高很多,因为每次只要对刚进入窗口的批次和刚离开窗口的批次进行计算即可,而无需对窗口内所有批次数据重新计算。
评论

全部评论(0)

回到顶部