返回 导航

Spark

hangge.com

Spark - Structured Streaming使用详解3(输入源2:文件系统)

作者:hangge | 2024-01-10 08:45

四、文件系统输入源

1,读取普通文件夹内的文件

(1)使用文件作为输入源时,Structured Streaming 会监控指定目录下的新文件,并读取其中的数据。下面是一个简单的样例代码:
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.{LongType, StringType, StructType}

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("Hello")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 定义 Schema, 用于指定列名以及列中的数据类型
    val userSchema: StructType = new StructType().add("name", StringType).add("age", LongType)

    // 创建一个流式DataFrame,这里从文件系统中读取数据
    val user: DataFrame = spark.readStream
      .format("csv")
      .schema(userSchema)
      .load("/Volumes/BOOTCAMP/test")  // 必须是目录

    // 启动查询, 把结果打印到控制台
    val query = user.writeStream
      .outputMode("append") // 使用append输出模式
      .format("console")
      .start()

    // 等待应用程序终止
    query.awaitTermination()

    //关闭 Spark
    spark.stop()
  }
}

(2)程序启动后,它会开始监控指定目录,并读取在监控目录中新产生文件的数据。我们可以测试一下,进入设定的监控目录,添加一个 csv 文件,文件内容如下:


(3)Structured Streaming 应用程序这边将会实时处理该文件的数据并输出结果:

2,读取自动分区的文件夹内的文件

(1)当文件夹被命名为 “key=value” 形式时, Structured Streaming 会自动递归遍历当前文件夹下的所有子文件夹, 并根据文件名实现自动分区。
注意:如果文件夹的命名规则不是“key=value”形式, 则不会触发自动分区。另外, 同级目录下的文件夹的命名规则必须一致。

(2)程序代码还是跟上面一样,没有变化。不过我们这次在目标文件夹下创建如下目录结构:

(3)两个文件内容分别如下:

(4)运行程序,可以看到控制台输出内容如下:
评论

全部评论(0)

回到顶部