返回 导航

Spark

hangge.com

Spark - Structured Streaming使用详解6(基于事件时间EventTime的窗口操作)

作者:hangge | 2024-01-16 08:46

八、基于事件时间(Event Time)的窗口操作

1,基本介绍

(1)在 Structured Streaming 中, 可以按照事件发生时的时间对数据进行聚合操作,即基于 event-time(时间时间)进行操作。
  • 在这种机制下,不必考虑 Spark 陆续接收事件的顺序是否与事件发生的顺序一致,也不必考虑事件到达 Spark 的时间与事件发生时间的关系。因此,它在提高数据处理精度的同时,大大减少了开发者的工作量。

(2)举个例子:我们现在想计算 10 分钟内的单词,每 5 分钟更新一次,也就是说在 10 分钟窗口 12:00 - 12:1012:05 - 12:1512:10 - 12:20 等之间收到的单词量。注意,12:00 - 12:10 表示数据在 12:00 之后 12:10 之前到达.
  • 现在,考虑一下在 12:07 收到的单词。单词应该增加对应于两个窗口 12:00 - 12:10 12:05 - 12:15 的计数。因此,计数将由分组键(即单词)和窗口(可以从事件时间计算)索引。统计后的结果应该是下面这样的。

2,event-time 窗口生成规则

(1)窗口生成规则的关键代码在 org.apache.spark.sql.catalyst.analysis.TimeWindowing 中:
// 窗口个数
maxNumOverlapping <- ceil(windowDuration / slideDuration)
for (i <- 0 until maxNumOverlapping)
   lastStart <- timestamp - (timestamp - startTime + slideDuration) % slideDuration
 windowStart <- lastStart - i * slideDuration
 windowEnd <- windowStart + windowDuration
 return windowStart, windowEnd

(2)Spark 会将 event-time 作为“初始窗口”的结束时间,然后按照窗口滑动宽度逐渐向时间轴前方推进,直到某个窗口不再包含该 event-time 为止,最终以“初始窗口”与“结束窗口”之间的若干个窗口作为最终生成的 event-time 的时间窗口。

(3)每个窗口的起始时间与结束时间都是前闭后开的区间,因此初始窗口和结束窗口都不会包含 event-time,最终不会被使用。得到窗口如下:

3,使用样例

(1)下面代码我们从一个本地端口(localhost:9999)接收文本数据流,将文本数据按单词进行切割,并在指定的时间窗口(窗口时长 10 分钟,滑动步长 5 分钟)内计算每个单词的出现次数,然后将结果输出到控制台。
import java.sql.Timestamp
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("Hello")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 创建一个流式DataFrame,这里从socket中读取数据
    val lines: DataFrame = spark.readStream
      .format("socket") // 设置数据源
      .option("host", "localhost")
      .option("port", 9999)
      .option("includeTimestamp", true)// 给产生的数据自动添加时间
      .load()

    // 把行切割成单词, 保留时间戳
    val words: DataFrame = lines.as[(String, Timestamp)]
      .flatMap(line => {
        line._1.split(" ").map((_, line._2))
       })
      .toDF("word", "timestamp")

    // 按照窗口和单词分组, 并且计算每组的单词的个数
    val wordCounts: Dataset[Row] = words.groupBy(
      // 调用 window 函数, 返回的是一个 Column,参数(df中表示时间戳的列、窗口长度、滑动步长)
      window($"timestamp", "10 minutes", "5 minutes"), $"word"
    ).count().orderBy($"window")  // 计数, 并按照窗口排序

    // 启动查询, 把结果打印到控制台
    val query = wordCounts.writeStream
      .outputMode("complete") // 使用complete输出模式
      .format("console")
      .option("truncate", "false")  // 不截断.为了在控制台能看到完整信息, 最好设置为 false
      .start()

    // 等待应用程序终止
    query.awaitTermination()

    //关闭 Spark
    spark.stop()
  }
}

(2)测试一下,首先我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(3)然后我们在该终端中输入一些文本数据(此时时间为 10:22):
a b

(4)可以看到 Structured Streaming 应用程序这边的控制台输出如下内容:

(5)然后等待 5 分钟后,我们再次在终端中输入一些文本数据(此刻时间为 10:27):
b c

(6)可以看到 Structured Streaming 应用程序这边的控制台输出如下内容:

附:使用数据中附带的真实事件时间

1,样例代码

    上面的样例为方便演示我们是给接收到数据自动添加个当前时间作为事件时间。但在实际生产环境中,原始数据会自动包含事件时间,我们只需要将其解析出来使用即可。下面是修改后的代码:
import org.apache.spark.sql.functions.window
import java.sql.Timestamp
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("Hello")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 创建一个流式DataFrame,这里从socket中读取数据
    val lines: DataFrame = spark.readStream
      .format("socket") // 设置数据源
      .option("host", "localhost")
      .option("port", 9999)
      .load()

    // 解析原始输入数据,假设原始数据格式为"单词1 单词2 单词3,事件时间"
    val wordsWithTimestamp: DataFrame = lines.as[String]
      .map(line => {
        val parts = line.split(",")
        val words = parts(0).split(" ")
        val timestamp = Timestamp.valueOf(parts(1).trim())
        (words, timestamp)
      })
      .flatMap { case (words, timestamp) =>
        words.map(word => (word, timestamp))
      }
      .toDF("word", "timestamp")

    // 按照窗口和单词分组, 并且计算每组的单词的个数
    val wordCounts: Dataset[Row] = wordsWithTimestamp.groupBy(
      // 调用 window 函数, 返回的是一个 Column,参数(df中表示时间戳的列、窗口长度、滑动步长)
      window($"timestamp", "10 minutes", "5 minutes"), $"word"
    ).count().orderBy($"window")  // 计数, 并按照窗口排序

    // 启动查询, 把结果打印到控制台
    val query = wordCounts.writeStream
      .outputMode("complete") // 使用complete输出模式
      .format("console")
      .option("truncate", "false")  // 不截断.为了在控制台能看到完整信息, 最好设置为 false
      .start()

    // 等待应用程序终止
    query.awaitTermination()

    //关闭 Spark
    spark.stop()
  }
}

2,运行测试

(1)首先我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(2)然后我们在该终端中输入一些文本数据:
a b,2023-09-05 10:31:00
b,2023-09-05 10:33:00

(3)可以看到 Structured Streaming 应用程序这边的控制台输出如下内容:

(4)然后我们再在该终端中输入一些文本数据:
1 2,2023-09-01 08:30:00

(5)可以看到 Structured Streaming 应用程序这边的控制台输出如下内容:
评论

全部评论(0)

回到顶部