Spark - Structured Streaming使用详解11(输出接收器2:file sink)
作者:hangge | 2024-01-25 08:30
三、file sink
1,基本介绍
(1)file sink 用于将流式处理结果写入文件系统。它允许我们将流数据输出到本地文件系统或分布式文件系统(如 HDFS)中。支持的数据格式有 text、csv、json、parquet、orc 等。
(2)file sink 仅支持 Append 输出模式。
2,使用样例
下面是一个读取 socket 数据,然后把读取到的单词和单词的反转组成 json 格式写入到目录中:object Hello { def main(args: Array[String]): Unit = { // 创建 SparkSession val spark = SparkSession.builder() .appName("HelloStructuredStreaming") .master("local[*]") .getOrCreate() // 导入隐式转换 import spark.implicits._ // 创建一个流式DataFrame,这里从Socket读取数据 val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() // 反转单词 val words = lines.as[String].flatMap(line => { line.split("\\W+").map(word => { (word, word.reverse) }) }).toDF("原单词", "反转单词") // 启动查询, 把结果输出至文件 val query = words.writeStream .outputMode("append") .format("json") // 支持 "orc", "json", "csv"等 .option("path", "./output") // 输出目录 .option("checkpointLocation", "./ck1") // 必须指定 checkpoint 目录 .start() // 等待应用程序终止 query.awaitTermination() //关闭 Spark spark.stop() } }
3,运行测试
(1)我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
(2)程序启动后,我们在该终端中依次输入如下两行文本数据:
hangge baidu a 123 apple
(3)可以看到在指定的目录下会自动生成两个 json 文件,两个文件的内容分别如下:
全部评论(0)