Spark - Structured Streaming使用详解3(输入源2:文件系统)
作者:hangge | 2024-01-10 08:45
四、文件系统输入源
1,读取普通文件夹内的文件
(1)使用文件作为输入源时,Structured Streaming 会监控指定目录下的新文件,并读取其中的数据。下面是一个简单的样例代码:
import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.types.{LongType, StringType, StructType} object Hello { def main(args: Array[String]): Unit = { // 创建 SparkSession val spark = SparkSession.builder() .appName("Hello") .master("local[*]") .getOrCreate() // 导入隐式转换 import spark.implicits._ // 定义 Schema, 用于指定列名以及列中的数据类型 val userSchema: StructType = new StructType().add("name", StringType).add("age", LongType) // 创建一个流式DataFrame,这里从文件系统中读取数据 val user: DataFrame = spark.readStream .format("csv") .schema(userSchema) .load("/Volumes/BOOTCAMP/test") // 必须是目录 // 启动查询, 把结果打印到控制台 val query = user.writeStream .outputMode("append") // 使用append输出模式 .format("console") .start() // 等待应用程序终止 query.awaitTermination() //关闭 Spark spark.stop() } }
(2)程序启动后,它会开始监控指定目录,并读取在监控目录中新产生文件的数据。我们可以测试一下,进入设定的监控目录,添加一个 csv 文件,文件内容如下:
(3)Structured Streaming 应用程序这边将会实时处理该文件的数据并输出结果:
2,读取自动分区的文件夹内的文件
(1)当文件夹被命名为 “key=value” 形式时, Structured Streaming 会自动递归遍历当前文件夹下的所有子文件夹, 并根据文件名实现自动分区。
注意:如果文件夹的命名规则不是“key=value”形式, 则不会触发自动分区。另外, 同级目录下的文件夹的命名规则必须一致。
(2)程序代码还是跟上面一样,没有变化。不过我们这次在目标文件夹下创建如下目录结构:
(3)两个文件内容分别如下:
(4)运行程序,可以看到控制台输出内容如下:
全部评论(0)