Spark - Structured Streaming使用详解14(Trigger 触发器)
作者:hangge | 2024-01-31 08:45
流式查询的触发器(Trigger)定义了流式数据处理的时间,流式查询根据触发器的不同,可以是根据固定的批处理间隔进行微批处理查询,也可以是连续的查询。下面通过样例演示各种触发器的使用。
(3)上面程序运行后处理完一批次数据即退出程序:
(2)下面是一个使用连续处理触发器的样例:
十四、Trigger(触发器)
1,默认触发器
(1)如果我们没有显示的设定触发器,则尽可能块的处理每个批次的数据。如果无数据可用,则处于阻塞状态,等待数据流入。
(2)下面的样例我们没有指定触发器,即使用默认触发器:
import org.apache.spark.sql.SparkSession object Hello { def main(args: Array[String]): Unit = { // 创建 SparkSession val spark = SparkSession.builder() .appName("HelloStructuredStreaming") .master("local[*]") .getOrCreate() // 导入隐式转换 import spark.implicits._ // 创建一个流式DataFrame,这里从Socket读取数据 val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() // 对流式DataFrame进行转换,将每一行数据切分成单词,然后进行单词统计 val wordCounts = lines.as[String] .flatMap(_.split(" ")) .groupBy("value") .count() // 启动查询, 把结果打印到控制台 val query = wordCounts.writeStream .outputMode("complete") .format("console") .start() // 等待应用程序终止 query.awaitTermination() //关闭 Spark spark.stop() } }
2,固定周期的微批处理触发器
(1)使用该触发器查询会在微批处理模式下执行,其中微批处理将以用户指定的间隔执行,具体规则如下:
- 如果以前的微批处理在间隔内完成,则引擎会等待间隔结束,然后开启下一个微批次
- 如果前一个微批处理在一个间隔内没有完成(即错过了间隔边界),则下个微批处理会在上一个完成之后立即启动(不会等待下一个间隔边界)
- 如果没有新数据可用,则不会启动微批次,适用于流式数据的批处理作业。
(2)下面是一个使用固定周期的微批处理触发器的样例:
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.Trigger object Hello { def main(args: Array[String]): Unit = { // 创建 SparkSession val spark = SparkSession.builder() .appName("HelloStructuredStreaming") .master("local[*]") .getOrCreate() // 导入隐式转换 import spark.implicits._ // 创建一个流式DataFrame,这里从Socket读取数据 val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() // 对流式DataFrame进行转换,将每一行数据切分成单词,然后进行单词统计 val wordCounts = lines.as[String] .flatMap(_.split(" ")) .groupBy("value") .count() // 启动查询, 把结果打印到控制台 val query = wordCounts.writeStream .outputMode("complete") .format("console") .trigger(Trigger.ProcessingTime("2 seconds")) // 间隔时间为2秒 .start() // 等待应用程序终止 query.awaitTermination() //关闭 Spark spark.stop() } }
3,一次性微批次触发器
(1)使用该触发器查询将在所有可用数据上执行一次微批次处理,然后自行停止。如果我们希望定期启动集群,然后处理集群关闭期间产生的数据,然后再关闭集群,这种情况下很有用。它可以显著的降低成本,一般用于非实时的数据分析。
(2)下面是一个使用一次性微批次触发器的样例:
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.Trigger object Hello { def main(args: Array[String]): Unit = { // 创建 SparkSession val spark = SparkSession.builder() .appName("HelloStructuredStreaming") .master("local[*]") .getOrCreate() // 导入隐式转换 import spark.implicits._ // 创建一个流式DataFrame,这里从Socket读取数据 val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() // 对流式DataFrame进行转换,将每一行数据切分成单词,然后进行单词统计 val wordCounts = lines.as[String] .flatMap(_.split(" ")) .groupBy("value") .count() // 启动查询, 把结果打印到控制台 val query = wordCounts.writeStream .outputMode("complete") .format("console") .trigger(Trigger.Once()) .start() // 等待应用程序终止 query.awaitTermination() //关闭 Spark spark.stop() } }
(3)上面程序运行后处理完一批次数据即退出程序:
4,连续处理触发器
(1)连续处理触发器是 2.3 引入,用于以超低延迟处理数据,可以实现低至 1ms 的处理延迟。并实现了至少一次(at-least-once)的语义。
注意:微批处理模式虽然实现了严格一次(exactly-once)的语义,但是最低有 100ms 的延迟。对有些类型的查询,可以切换到这个模式,而不需要修改应用的逻辑。(不用更改 df/ds 操作)
连续处理模式支持如下查询:
- 操作:支持 select,map,flatMap,mapPartitions ...和 selections(where,filter...),不支持聚合操作
- 数据源:
- kafka 所有选项都支持
- rate source
- sink:
- 所有的 kafka 参数都支持
- memory sink
- console sink
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.Trigger object Hello { def main(args: Array[String]): Unit = { // 创建 SparkSession val spark = SparkSession.builder() .appName("HelloStructuredStreaming") .master("local[*]") .getOrCreate() // 导入隐式转换 import spark.implicits._ // 模拟一个包含两列的实时数据流:timestamp 和 value val streamDF = spark.readStream .format("rate") .option("rowsPerSecond", 100) .load() val processedStream = streamDF.selectExpr("timestamp", "rand() as value") // 启动查询, 把结果打印到控制台 val query = processedStream.writeStream .outputMode("append") .format("console") .trigger(Trigger.Continuous("200 millisecond")) //200毫秒触发一次 .start() // 等待应用程序终止 query.awaitTermination() //关闭 Spark spark.stop() } }
全部评论(0)