Spark - Structured Streaming使用详解11(输出接收器2:file sink)
作者:hangge | 2024-01-25 08:30
三、file sink
1,基本介绍
(1)file sink 用于将流式处理结果写入文件系统。它允许我们将流数据输出到本地文件系统或分布式文件系统(如 HDFS)中。支持的数据格式有 text、csv、json、parquet、orc 等。
(2)file sink 仅支持 Append 输出模式。
2,使用样例
下面是一个读取 socket 数据,然后把读取到的单词和单词的反转组成 json 格式写入到目录中:object Hello {
def main(args: Array[String]): Unit = {
// 创建 SparkSession
val spark = SparkSession.builder()
.appName("HelloStructuredStreaming")
.master("local[*]")
.getOrCreate()
// 导入隐式转换
import spark.implicits._
// 创建一个流式DataFrame,这里从Socket读取数据
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// 反转单词
val words = lines.as[String].flatMap(line => {
line.split("\\W+").map(word => {
(word, word.reverse)
})
}).toDF("原单词", "反转单词")
// 启动查询, 把结果输出至文件
val query = words.writeStream
.outputMode("append")
.format("json") // 支持 "orc", "json", "csv"等
.option("path", "./output") // 输出目录
.option("checkpointLocation", "./ck1") // 必须指定 checkpoint 目录
.start()
// 等待应用程序终止
query.awaitTermination()
//关闭 Spark
spark.stop()
}
}
3,运行测试
(1)我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
(2)程序启动后,我们在该终端中依次输入如下两行文本数据:
hangge baidu a 123 apple
(3)可以看到在指定的目录下会自动生成两个 json 文件,两个文件的内容分别如下:

全部评论(0)