返回 导航

Spark

hangge.com

Spark - Structured Streaming使用详解11(输出接收器2:file sink)

作者:hangge | 2024-01-25 08:30

三、file sink

1,基本介绍

(1)file sink 用于将流式处理结果写入文件系统。它允许我们将流数据输出到本地文件系统或分布式文件系统(如 HDFS)中。支持的数据格式有 textcsvjsonparquetorc 等。
(2)file sink 仅支持 Append 输出模式。

2,使用样例

下面是一个读取 socket 数据,然后把读取到的单词和单词的反转组成 json 格式写入到目录中:
object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("HelloStructuredStreaming")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 创建一个流式DataFrame,这里从Socket读取数据
    val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()

    // 反转单词
    val words = lines.as[String].flatMap(line => {
      line.split("\\W+").map(word => {
        (word, word.reverse)
      })
    }).toDF("原单词", "反转单词")

    // 启动查询, 把结果输出至文件
    val query = words.writeStream
      .outputMode("append")
      .format("json") // 支持 "orc", "json", "csv"等
      .option("path", "./output") // 输出目录
      .option("checkpointLocation", "./ck1")  // 必须指定 checkpoint 目录
      .start()

    // 等待应用程序终止
    query.awaitTermination()

    //关闭 Spark
    spark.stop()
  }
}

3,运行测试

(1)我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(2)程序启动后,我们在该终端中依次输入如下两行文本数据:
hangge baidu a
123 apple

(3)可以看到在指定的目录下会自动生成两个 json 文件,两个文件的内容分别如下:

评论

全部评论(0)

回到顶部