Spark - Structured Streaming使用详解3(输入源2:文件系统)
作者:hangge | 2024-01-10 08:45
四、文件系统输入源
1,读取普通文件夹内的文件
(1)使用文件作为输入源时,Structured Streaming 会监控指定目录下的新文件,并读取其中的数据。下面是一个简单的样例代码:
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.{LongType, StringType, StructType}
object Hello {
def main(args: Array[String]): Unit = {
// 创建 SparkSession
val spark = SparkSession.builder()
.appName("Hello")
.master("local[*]")
.getOrCreate()
// 导入隐式转换
import spark.implicits._
// 定义 Schema, 用于指定列名以及列中的数据类型
val userSchema: StructType = new StructType().add("name", StringType).add("age", LongType)
// 创建一个流式DataFrame,这里从文件系统中读取数据
val user: DataFrame = spark.readStream
.format("csv")
.schema(userSchema)
.load("/Volumes/BOOTCAMP/test") // 必须是目录
// 启动查询, 把结果打印到控制台
val query = user.writeStream
.outputMode("append") // 使用append输出模式
.format("console")
.start()
// 等待应用程序终止
query.awaitTermination()
//关闭 Spark
spark.stop()
}
}
(2)程序启动后,它会开始监控指定目录,并读取在监控目录中新产生文件的数据。我们可以测试一下,进入设定的监控目录,添加一个 csv 文件,文件内容如下:

(3)Structured Streaming 应用程序这边将会实时处理该文件的数据并输出结果:

2,读取自动分区的文件夹内的文件
(1)当文件夹被命名为 “key=value” 形式时, Structured Streaming 会自动递归遍历当前文件夹下的所有子文件夹, 并根据文件名实现自动分区。
注意:如果文件夹的命名规则不是“key=value”形式, 则不会触发自动分区。另外, 同级目录下的文件夹的命名规则必须一致。
(2)程序代码还是跟上面一样,没有变化。不过我们这次在目标文件夹下创建如下目录结构:
(3)两个文件内容分别如下:

(4)运行程序,可以看到控制台输出内容如下:
全部评论(0)