返回 导航

Spark

hangge.com

Spark - Structured Streaming使用详解14(Trigger 触发器)

作者:hangge | 2024-01-31 08:45
    流式查询的触发器(Trigger)定义了流式数据处理的时间,流式查询根据触发器的不同,可以是根据固定的批处理间隔进行微批处理查询,也可以是连续的查询。下面通过样例演示各种触发器的使用。

十四、Trigger(触发器)

1,默认触发器

(1)如果我们没有显示的设定触发器,则尽可能块的处理每个批次的数据。如果无数据可用,则处于阻塞状态,等待数据流入。
(2)下面的样例我们没有指定触发器,即使用默认触发器:
import org.apache.spark.sql.SparkSession

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("HelloStructuredStreaming")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 创建一个流式DataFrame,这里从Socket读取数据
    val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()

    // 对流式DataFrame进行转换,将每一行数据切分成单词,然后进行单词统计
    val wordCounts = lines.as[String]
      .flatMap(_.split(" "))
      .groupBy("value")
      .count()

    // 启动查询, 把结果打印到控制台
    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .start()

    // 等待应用程序终止
    query.awaitTermination()

    //关闭 Spark
    spark.stop()
  }
}

2,固定周期的微批处理触发器

(1)使用该触发器查询会在微批处理模式下执行,其中微批处理将以用户指定的间隔执行,具体规则如下:
  • 如果以前的微批处理在间隔内完成,则引擎会等待间隔结束,然后开启下一个微批次 
  • 如果前一个微批处理在一个间隔内没有完成(即错过了间隔边界),则下个微批处理会在上一个完成之后立即启动(不会等待下一个间隔边界)
  • 如果没有新数据可用,则不会启动微批次,适用于流式数据的批处理作业。
(2)下面是一个使用固定周期的微批处理触发器的样例:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("HelloStructuredStreaming")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 创建一个流式DataFrame,这里从Socket读取数据
    val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()

    // 对流式DataFrame进行转换,将每一行数据切分成单词,然后进行单词统计
    val wordCounts = lines.as[String]
      .flatMap(_.split(" "))
      .groupBy("value")
      .count()

    // 启动查询, 把结果打印到控制台
    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .trigger(Trigger.ProcessingTime("2 seconds")) // 间隔时间为2秒
      .start()

    // 等待应用程序终止
    query.awaitTermination()

    //关闭 Spark
    spark.stop()
  }
}

3,一次性微批次触发器

(1)使用该触发器查询将在所有可用数据上执行一次微批次处理,然后自行停止。如果我们希望定期启动集群,然后处理集群关闭期间产生的数据,然后再关闭集群,这种情况下很有用。它可以显著的降低成本,一般用于非实时的数据分析。
(2)下面是一个使用一次性微批次触发器的样例:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("HelloStructuredStreaming")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 创建一个流式DataFrame,这里从Socket读取数据
    val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()

    // 对流式DataFrame进行转换,将每一行数据切分成单词,然后进行单词统计
    val wordCounts = lines.as[String]
      .flatMap(_.split(" "))
      .groupBy("value")
      .count()

    // 启动查询, 把结果打印到控制台
    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .trigger(Trigger.Once())
      .start()

    // 等待应用程序终止
    query.awaitTermination()

    //关闭 Spark
    spark.stop()
  }
}

(3)上面程序运行后处理完一批次数据即退出程序:

4,连续处理触发器

(1)连续处理触发器是 2.3 引入,用于以超低延迟处理数据,可以实现低至 1ms 的处理延迟。并实现了至少一次(at-least-once)的语义。
注意:微批处理模式虽然实现了严格一次(exactly-once)的语义,但是最低有 100ms 的延迟。对有些类型的查询,可以切换到这个模式,而不需要修改应用的逻辑。(不用更改 df/ds 操作)
连续处理模式支持如下查询:
  1. 操作:支持 selectmapflatMapmapPartitions ...和 selectionswherefilter...),不支持聚合操作
  2. 数据源:
    1. kafka 所有选项都支持
    2. rate source
  3. sink
    1. 所有的 kafka 参数都支持
    2. memory sink
    3. console sink

(2)下面是一个使用连续处理触发器的样例:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("HelloStructuredStreaming")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 模拟一个包含两列的实时数据流:timestamp 和 value
    val streamDF = spark.readStream
      .format("rate")
      .option("rowsPerSecond", 100)
      .load()

    val processedStream = streamDF.selectExpr("timestamp", "rand() as value")

    // 启动查询, 把结果打印到控制台
    val query = processedStream.writeStream
      .outputMode("append")
      .format("console")
      .trigger(Trigger.Continuous("200 millisecond")) //200毫秒触发一次
      .start()

    // 等待应用程序终止
    query.awaitTermination()

    //关闭 Spark
    spark.stop()
  }
}
评论

全部评论(0)

回到顶部