返回 导航

Spark

hangge.com

Spark - Structured Streaming使用详解10(输出接收器1:console、memory sink)

作者:hangge | 2024-01-24 08:40

一、console sink

1,基本介绍

(1)console sink 用于将流式处理结果输出到控制台(终端)。这对于调试和测试非常有用,因为它允许我们在控制台上实时查看流式处理的结果。
(2)console sink 支持 AppendCompleteUpdate 这几种输出模式。

2,使用样例

下面是一个读取 socket 数据然后进行单词统计,并将结果输出到控制台中:
import org.apache.spark.sql.SparkSession

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("HelloStructuredStreaming")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 创建一个流式DataFrame,这里从Socket读取数据
    val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()

    // 对流式DataFrame进行转换,将每一行数据切分成单词
    val words = lines.as[String].flatMap(_.split(" "))

    // 对单词进行分组和聚合
    val wordCounts = words.groupBy("value").count()

    // 启动查询, 把结果打印到控制台
    val query = wordCounts.writeStream
      .outputMode("complete") // 或者选择其他适合的输出模式
      .format("console") // 输出至控制台
      .start()

    // 等待应用程序终止
    query.awaitTermination()

    //关闭 Spark
    spark.stop()
  }
}

3,运行测试

(1)我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(2)程序启动后,我们在该终端中输入一些文本数据:

(3)Structured Streaming 应用程序这边将会实时处理输入的文本数据并输出结果:

二、memory sink

1,基本介绍

(1)memory sink 也是用于测试,其将统计结果全部输入内存中指定的表中,然后可以通过 sql 与从表中查询数据。
注意:如果数据量非常大, 可能会发送内存溢出。

(2)memory sink 支持 AppendComplete 这几种输出模式。

2,使用样例

    下面是一个读取 socket 数据然后进行单词统计,并将结果输出到内存中。并且为了便于观察结果,我们还创建了个定时任务定时从内存表中读取并打印数据:
import java.util.{Timer, TimerTask}
import org.apache.spark.sql.SparkSession

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("HelloStructuredStreaming")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 创建一个流式DataFrame,这里从Socket读取数据
    val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()

    // 对流式DataFrame进行转换,将每一行数据切分成单词
    val words = lines.as[String].flatMap(_.split(" "))

    // 对单词进行分组和聚合
    val wordCounts = words.groupBy("value").count()

    // 启动查询, 把输出至内存
    val query = wordCounts.writeStream
      .outputMode("complete") // 或者选择其他适合的输出模式
      .format("memory") // 输出至内存
      .queryName("word_count") // 内存临时表名
      .start()

    // 测试使用定时器执行查询表
    val timer = new Timer(true)
    val task: TimerTask = new TimerTask {
      override def run(): Unit = spark.sql("select * from word_count").show
    }
    timer.scheduleAtFixedRate(task, 0, 10000)

    // 等待应用程序终止
    query.awaitTermination()

    //关闭 Spark
    spark.stop()
  }
}

3,运行测试

(1)我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(2)程序启动后,我们在该终端中输入一些文本数据:

(3)Structured Streaming 应用程序这边的控制台输出内容如下:
评论

全部评论(0)

回到顶部