返回 导航

Spark

hangge.com

Spark - Spark Streaming使用详解6(DStream有状态转换操作1:UpdateStateByKey)

作者:hangge | 2023-12-13 09:00
    在前面的文章中,我介绍了 DStream 的无状态转换操作。接下来我们进一步探讨 Spark Streaming 中的有状态转换操作,这些操作允许我们跨批次保持数据状态,从而执行更复杂的分析和处理。本文首先介绍有状态转换操作中的 UpdateStateByKey 操作。

六、DStream 有状态转换操作1:UpdateStateByKey

1,基本介绍

(1)有时我们需要在 DStream 中跨批次维护状态(例如流计算中累加 wordcount)。针对这种情况,updateStateByKey() 为我们提供了对一个状态变量的访问,用于键值对形式的 DStream。给定一个由 (键,事件) 对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为 (键,状态) 对。
(2)updateStateByKey() 的结果会是一个新的 DStream,其内部的 RDD 序列是由每个时间区间对应的 (键,状态) 对组成的。
(3)updateStateByKey() 操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能,需要做下面两步:
  • 定义状态,状态可以是一个任意的数据类型。
  • 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。
注意:使用 updateStateByKey 需要对检查点目录进行配置,会使用检查点来保存状态

2,使用样例

(1)下面是一个进阶版的实时单词计数样例,不同于之前每个批次的数据统计后就直接输出,利用 UpdateStateByKey 操作可以将单词的频度逐批次累积起来。
object Hello {
  // 定义状态更新函数, 法,参数 newValues 为当前批次单词频度,runningCount 为以往批次单词频度
  def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount = newValues.sum + runningCount.getOrElse(0)
    Some(newCount)
  }

  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")

    // 初始化 StreamingContext,设置微批处理的时间间隔为 5 秒
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    // 设置检查点以保存状态
    ssc.checkpoint("./checkpoint")

    // 通过监控端口创建 DStream,读进来的数据为一行行
    val inputDStream: DStream[String] = ssc.socketTextStream("localhost", 9999)

    // 对 DStream 进行转换操作,将每一行数据做切分,形成一个个单词
    val wordStreams = inputDStream.flatMap(_.split(" "))

    // 将单词映射成元组(word,1)
    val wordAndOneStreams = wordStreams.map((_, 1))

    // 使用updateStateByKey进行状态更新
    val statefulCounts = wordAndOneStreams.updateStateByKey(updateFunction)

    // 打印结果
    statefulCounts.print()

    // 启动 StreamingContext
    ssc.start()

    // 等待应用程序终止(遇错停止,否则不断工作)
    ssc.awaitTermination()
  }
}

(2)测试时我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(3)程序启动后,我们在该终端中输入一些文本数据,Spark Streaming 应用程序这边将会实时处理输入的文本数据并输出累计的结果: 
评论

全部评论(0)

回到顶部