返回 导航

Spark

hangge.com

Spark - Structured Streaming使用详解1(基本概念介绍、简单样例)

作者:hangge | 2024-01-05 09:05

一、基本概念介绍

1,什么是 Structured Streaming

(1)Structured Streaming Spark2.0 开始引入的一套新的流式计算模型。该组件进一步降低了处理数据的延迟时间,它实现了“有且仅有一次(Exectly Once” 语义,可以保证数据被精准消费。
    默认情况下,在内部 Structured Streaming 查询使用微批处理引擎(micro-batch processing engine)处理,微批处理引擎把流数据当做一系列的小批 jobsmall batch jobs)来处理。所以,延迟低至 100 毫秒。而从 Spark2.3 开始又引入了一个新的低延迟处理模型:Continuous Processing,使得延迟低至 1 毫秒.

(2)总之,Structured Streaming 提供了快速、弹性、容错、end-to-end exactly-once 的流处理,而用户不需要对流进行推理(比如 spark streaming 中的流的各种转换)。

2,Structured Streaming 与 Spark SQL 的关系

  • Structured Streaming 基于 Spark SQL 引擎,是一个具有弹性和容错的流式处理引擎。使用 Structure Streaming 处理流式计算的方式和使用批处理计算静态数据(表中的数据)的方式是一样的。
  • 随着流数据的持续到达,Spark SQL 引擎持续不断的运行并得到最终的结果。我们可以使用 Dataset/DataFrame API 来表达流的聚合,事件-时间窗口(event-time windows),流-批处理连接(stream-to-batch joins)等等。这些计算都是运行在被优化过的 Spark SQL 引擎上。最终,通过 chekcpoinWALsWrite-Ahead Logs),系统保证 end-to-end exactly-once
注意到目前,DataFrame/Dataset 的有些操作 Streaming DataFrame/Dataset 还不支持:
  • Limittake 操作不支持流式数据集。也就是无法在流式数据集上使用这些操作来限制和获取前 N 行。
  • 流式数据集上不支持进行 Distinct 操作。
  • 流式数据集上不支持某些类型的外连接操作。
  • 排序操作仅在聚合后并且在“Complete”输出模式下才支持流式数据集上的操作。
  • 在"Update"和"Complete"模式下,不支持在流式数据集上链式调用多个具有状态的操作。
  • 在"Append"模式下,以下操作后跟其他具有状态的操作也不受支持:
    • 流-流时间间隔连接(内连接/外连接)
    • flatMapGroupsWithState
    • 已知的解决方法是将流式查询拆分为多个查询,每个查询中只包含一个具有状态的操作,并确保每个查询的端到端恰好一次语义。对于最后一个查询,确保端到端恰好一次语义是可选的。
  • 有些方法不能直接用于查询和返回结果, 因为他们用在流式数据上没有意义:
    • count() 不能返回单行数据,必须是 s.groupBy().count()
    • foreach() 不能直接使用,而是使用 ds.writeStream.foreach(...)
    • show() 不能直接使用,而是使用 console sink

3,spark streaming 与 structured streaming 比较

(1)处理模型:
  • Spark Streaming:使用微批处理模型,将实时数据流划分为小批次,并在每个批次上执行数据处理操作。批次大小可以根据需求配置,但通常在几秒到数分钟之间。通常适用于对延迟要求不高的场景。
  • Structured Streaming:也使用微批处理模型,但提供了更高层次的 API,使开发者可以使用类似于批处理的 API 来处理实时数据。Structured Streaming 隐藏了底层的批处理细节,让开发者可以将实时数据处理看作是一系列连续的批处理。更适用于需要更接近实时处理的场景。

(2)API 和编程模型:
  • Spark Streaming:使用 Discretized StreamsDStreamsAPI,需要开发者以函数式编程风格编写批处理逻辑。相对较低级,需要开发者显式处理状态和时间窗口等细节。
  • Structured Streaming:提供了更高层次的 DataFrameSQL API,允许开发者以 SQL 查询和 DataFrame 转换的方式来处理数据。这使得编程更加直观且类似于批处理,同时隐藏了状态管理和时间窗口等复杂性。

(3)数据的时间:
  • Spark Streaming 基于处理时间(Processing Time)将数据落入 window
  • Structured Streaming 基于事件时间(Event Time)将数据落入 window
说明:
  • 处理时间是数据到达处理系统的时间。
  • 事件时间是数据实际产生的时间,通常由数据中的时间戳表示。事件时间在流处理中很重要,因为数据可能乱序到达,需要通过水印(Watermark)机制来处理延迟数据。

4,事件时间和延迟数据的处理

(1)Structured streaming 与其他的流式引擎有很大的不同。许多系统要求用户自己维护运行的聚合,所以用户自己必须推理数据的一致性(at-least-onceor at-most-onceor exactly-once)。在 Structured streaming 模型中,当有新数据的时候 spark 负责更新结果表,从而减轻了用户的推理工作。

(2)我们来看下个模型如何处理基于事件时间的处理和迟到的数据。
  • Event-time 是指嵌入到数据本身的时间,或者指数据产生的时间。对大多数应用程序来说,我们想基于这个时间去操作数据。例如,如果我们获取 IoTInternet of Things)设备每分钟产生的事件数,我们更愿意使用数据产生时的时间(event-time in the data),而不是 spark 接收到这些数据时的时间。
  • 在这个模型中,event-time 是非常自然的表达。来自设备的每个时间都是表中的一行,event-time 是行中的一列。允许基于窗口的聚合(例如,每分钟的事件数)仅仅是 event-time 列上的特殊类型的分组(grouping)和聚合(aggregation):每个时间窗口是一个组,并且每一行可以属于多个窗口/组。因此,可以在静态数据集和数据流上进行基于事件时间窗口( event-time-window-based)的聚合查询,从而使用户操作更加方便。
  • 此外,该模型也可以自然的处理晚于 event-time 的数据,因为 spark 一直在更新结果表,所以它可以完全控制更新旧的聚合数据,或清除旧的聚合以限制中间状态数据的大小。
  • Spark 2.1 起,开始支持 watermark 来允许用于指定数据的超时时间(即接收时间比 event-time 晚多少),并允许引擎相应的清理旧状态。

5,输入表、结果表

(1)Structured Streaming 的关键思想是将持续不断的数据当做一个不断追加的表。将输入的流数据当做一张 “输入表”。把每一条到达的数据作为输入表的新的一行来追加。

(2)在输入表上执行的查询将会生成 “结果表”。每个触发间隔(trigger interval)(如:1s),新的行追加到输入表,最终更新结果表。无论何时更新结果表,我们都希望将更改的结果行 output 到外部存储/接收器(external sink)。

(3)下面通过一个 word count 例子说明下 Structured Streaming 编程模型:第一行表示从 socket 不断接收数据,第二行是时间轴,表示每隔 1 秒进行一次数据处理,第三行可以看成是之前提到的“unbound table”,而第四行为最终的 wordCounts 是结果集。当有新的数据到达时,Spark 会执行“增量”查询,并更新结果集;该示例设置为 Complete Mode,因此每次都将所有数据输出到控制台。
具体的执行过程;
  • 在第 1 秒时,此时到达的数据为"cat dog"和"dog dog",因此我们可以得到第 1 秒时的结果集 cat=1 dog=3,并输出到控制台;
  • 当第 2 秒时,到达的数据为"owl cat",此时"unbound table"增加了一行数据"owl cat",执行 word count 查询并更新结果集,可得第 2 秒时的结果集为 cat=2 dog=3 owl=1,并输出到控制台;
  • 当第 3 秒时,到达的数据为"dog"和"owl",此时"unbound table"增加两行数据"dog"和"owl",执行 word count 查询并更新结果集,可得第 3 秒时的结果集为 cat=2 dog=4 owl=2
注意Structured Streaming 不会实现整个表。它从流式数据源读取最新的可用数据,持续不断的处理这些数据,然后更新结果,并且会丢弃原始数据。它仅保持最小的中间状态的数据,,以用于更新结果(例如上面例子中的中间 counts

6,输出模式(outputMode)

(1)Complete Mode:即每次更新结果集时,都将整个结果集写入外部存储中;由存储连接器(storage connector)决定如何处理整个表的写入(类似于 spark streaming 中的有转态的转换)。注意:使用该模式必须要有聚合,同时该模式使用 watermask 无效。

(2)Append Mode(默认输出模式):从上次触发结束开始算起, 仅仅把那些新追加到结果表中的行写到外部存储(类似于无状态的转换)。该模式仅适用于不会更改结果表中行的那些查询。采用这种输出模式, 可以保证每行数据仅输出一次。注意:如果有聚合操作, 则必须添加 wartemark,否则不支持此种模式。
  • Append 的语义将保证,一旦输出了某条 key,未来就不会再输出同一个 key
  • 所以,在下图 12:10 这个批次直接输出 12:00-12:10|cat|1, 12:05-12:15|cat|1 将是错误的,因为在 12:20 将结果更新为了 12:00-12:10|cat|2,但是 Append 模式下却不会再次输出 12:00-12:10|cat|2,因为前面输出过了同一条 key 12:00-12:10|cat 的结果 12:00-12:10|cat|1
  • 为了解决这个问题,在 Append 模式下,Structured Streaming 需要知道,某一条 key 的结果什么时候不会再更新了。当确认结果不会再更新的时候,就可以将结果进行输出。

(3)Update Mode:从上次触发结束开始算起, 仅仅在结果表中更新的行(新增或修改)会写入到外部存储. 此模式从 2.1.1 可用。注意:使用该模式可以包含聚合操作,也可以不包含聚合操作。Update Mode Complete Mode 的不同在于 Update Mode 仅仅输出改变的那些行. 如果查询不包括聚合操作, 则等同于 Append Mode
  • 如下图所示,在 Update 模式中,只有本执行批次 State 中被更新了的条目会被输出:
  • 12:10 这个执行批次,State 中全部 2 条都是新增的(因而也都是被更新了的),所以输出全部 2 条;
  • 12:20 这个执行批次,State 2 条是被更新了的、 4 条都是新增的(因而也都是被更新了的),所以输出全部 6 条;
  • 12:30 这个执行批次,State 4 条是被更新了的,所以输出 4 条。这些需要特别注意的一点是,如 Append 模式一样,本执行批次中由于(通过 watermark 机制)确认 12:00-12:10 这个 window 不会再被更新,因而将其从 State 中去除,但没有因此产生输出。

7,容错语义

  • "Exactly-Once" 是一种消息传递的语义,用于确保消息在系统中只被处理一次。一般用于分布式系统中,特别是在消息队列或流处理系统中。
  • 提供端到端的 exactly-once 语义是 Structured Streaming 设计的主要目标。为了达成这一目的,spark 设计了结构化流数据源,接收器和执行引擎(Structured Streaming sourcesthe sinks and the execution engine)以可靠的跟踪处理的进度,以便能够对任何失败能够重新启动或者重新处理。
  • 每种流数据源假定都有 offsets(类似于 Kafka offsets)用于追踪在流中的读取位置。引擎使用 checkpointWALs 来记录在每个触发器中正在处理的数据的 offset 范围。结合可重用的数据源(replayable source)和幂等接收器(idempotent sink), Structured Streaming 可以确保在任何失败的情况下端到端的 exactly-once 语义。

附:一个简单的 Structured Streaming 应用程序

1,功能说明

    我们使用 netcat 工具向 9999 端口不断的发送数据,而程序通过 Structured Streaming 读取端口数据并统计不同单词出现的次数。

2,代码编写

(1)首先我们编辑项目的 pom.xml 文件,添加 spark-core spark-sql 依赖:
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.12</artifactId>
    <version>3.3.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>3.3.1</version>
</dependency>

(2)具体代码如下:
import org.apache.spark.sql.SparkSession

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("HelloStructuredStreaming")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 创建一个流式DataFrame,这里从Socket读取数据
    val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()

    // 对流式DataFrame进行转换,将每一行数据切分成单词
    val words = lines.as[String].flatMap(_.split(" "))

    // 对单词进行分组和聚合
    val wordCounts = words.groupBy("value").count()

    // 启动查询, 把结果打印到控制台
    val query = wordCounts.writeStream
      .outputMode("complete") // 或者选择其他适合的输出模式
      .format("console")
      .start()

    // 等待应用程序终止
    query.awaitTermination()

    //关闭 Spark
    spark.stop()
  }
}

3,运行测试

(1)我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(2)程序启动后,我们在该终端中输入一些文本数据:

(3)Structured Streaming 应用程序这边将会实时处理输入的文本数据并输出结果:
评论

全部评论(0)

回到顶部