Spark - Structured Streaming使用详解5(SQL语法、DSL语法)
作者:hangge | 2024-01-15 08:35
从输入源获取数据后,我们就可以在 streaming DataFrames / Datasets上 应用各种操作。操作方式主要分两种:一种是直接执行 sql,另一种则是特定类型的 api(DSL)。下面分别进行介绍。
(2)程序启动后,我们在指定目录下添加一个 people.json 文件,文件内容如下:
(3)接着可以看到控制台输出如下内容:
(2)程序启动后,我们在指定目录下添加一个 people.json 文件,文件内容如下:
(3)运行结果同上面是一样的:
(2)程序启动后,我们在指定目录下添加一个 people.json 文件,文件内容如下:
(3)运行结果如下:
六、SQL 语法
1,基本介绍
- SQL 语法是一种结构化查询语言,用于处理和管理关系型数据。在 Spark SQL 中,我们可以使用标准的 SQL 查询语句来对注册的临时视图或者全局视图进行操作。
- SQL 语法非常直观,对于熟悉 SQL 的用户来说非常友好,尤其是那些习惯使用 SQL 进行数据查询和分析的开发者。
- Spark SQL 的 SQL 语法包括常见的 SQL 查询语句,例如 SELECT、FROM、WHERE、GROUP BY、ORDER BY 等,还支持 JOIN 操作、子查询、聚合函数以及其他高级功能。
2,使用样例
(1)下面代码我们将 DataFrame 注册为临时视图,然后使用 SQL 语法进行数据处理:
import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.types.{LongType, StringType, StructType} object Hello { def main(args: Array[String]): Unit = { // 创建 SparkSession val spark = SparkSession.builder() .appName("Hello") .master("local[*]") .getOrCreate() // 导入隐式转换 import spark.implicits._ // 定义 Schema, 用于指定列名以及列中的数据类型 val peopleSchema: StructType = new StructType() .add("name", StringType) .add("age", LongType) .add("sex", StringType) // 创建一个流式DataFrame,这里从文件系统中读取数据 val peopleDF: DataFrame = spark.readStream .format("json") .schema(peopleSchema) .load("/Volumes/BOOTCAMP/test") // 必须是目录 // 创建临时表 peopleDF.createOrReplaceTempView("people") // 查询年龄大于20的所有人员信息 val df: DataFrame = spark.sql("select * from people where age > 20") // 启动查询, 把结果打印到控制台 val query = df.writeStream .outputMode("append") // 使用append输出模式 .format("console") .start() // 等待应用程序终止 query.awaitTermination() //关闭 Spark spark.stop() } }
(2)程序启动后,我们在指定目录下添加一个 people.json 文件,文件内容如下:
{"name": "hangge","age": 100,"sex": "male"} {"name": "andy","age": 30,"sex": "male"} {"name": "justin","age": 19,"sex": "male"} {"name": "lisi","age": 18,"sex": "male"} {"name": "zs","age": 10,"sex": "female"} {"name": "zhiling","age": 40,"sex": "female"}
(3)接着可以看到控制台输出如下内容:
七、DSL 语法
1,基本介绍
- 特定领域语言(domain-specific language,DSL)允许我们使用编程语言(如 Scala、Java、Python)中的方法和函数来操作数据。使用 DSL 语法风格不必去创建临时视图。
- DSL 语法通过函数式编程的方式来构建查询操作,使用方法调用和操作符来代替 SQL 语句,这样的代码更加灵活和易于维护。DSL 语法提供了类型安全、编译时检查和代码提示等优点。
2,DataFrame 结合 DSL 语法的使用(弱类型 api)
(1)下面代码我们使用 DSL 语法对 DataFrame 进行数据处理:
import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.types.{LongType, StringType, StructType} object Hello { def main(args: Array[String]): Unit = { // 创建 SparkSession val spark = SparkSession.builder() .appName("Hello") .master("local[*]") .getOrCreate() // 导入隐式转换 import spark.implicits._ // 定义 Schema, 用于指定列名以及列中的数据类型 val peopleSchema: StructType = new StructType() .add("name", StringType) .add("age", LongType) .add("sex", StringType) // 创建一个流式DataFrame,这里从文件系统中读取数据 val peopleDF: DataFrame = spark.readStream .format("json") .schema(peopleSchema) .load("/Volumes/BOOTCAMP/test") // 必须是目录 // 查询年龄大于20的所有人员信息 val df: DataFrame = peopleDF.select("name","age", "sex").where("age > 20") // 启动查询, 把结果打印到控制台 val query = df.writeStream .outputMode("append") // 使用append输出模式 .format("console") .start() // 等待应用程序终止 query.awaitTermination() //关闭 Spark spark.stop() } }
(2)程序启动后,我们在指定目录下添加一个 people.json 文件,文件内容如下:
{"name": "hangge","age": 100,"sex": "male"} {"name": "andy","age": 30,"sex": "male"} {"name": "justin","age": 19,"sex": "male"} {"name": "lisi","age": 18,"sex": "male"} {"name": "zs","age": 10,"sex": "female"} {"name": "zhiling","age": 40,"sex": "female"}
(3)运行结果同上面是一样的:
3,Dataset 结合 DSL 语法的使用(强类型 api)
(1)下面代码我们使用 DSL 语法对 Dataset 进行数据处理:
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.types.{LongType, StringType, StructType} // 定义一个People样例类 case class People(name: String, age: Long, sex: String) object Hello { def main(args: Array[String]): Unit = { // 创建 SparkSession val spark = SparkSession.builder() .appName("Hello") .master("local[*]") .getOrCreate() // 导入隐式转换 import spark.implicits._ // 定义 Schema, 用于指定列名以及列中的数据类型 val peopleSchema: StructType = new StructType() .add("name", StringType) .add("age", LongType) .add("sex", StringType) // 创建一个流式DataFrame,这里从文件系统中读取数据 val peopleDF: DataFrame = spark.readStream .format("json") .schema(peopleSchema) .load("/Volumes/BOOTCAMP/test") // 必须是目录 // 转成 ds val peopleDS: Dataset[People] = peopleDF.as[People] // 查询年龄大于20的所有人员信息 val ds: Dataset[String] = peopleDS.filter(_.age > 20).map(_.name) // 启动查询, 把结果打印到控制台 val query = ds.writeStream .outputMode("append") // 使用append输出模式 .format("console") .start() // 等待应用程序终止 query.awaitTermination() //关闭 Spark spark.stop() } }
(2)程序启动后,我们在指定目录下添加一个 people.json 文件,文件内容如下:
{"name": "hangge","age": 100,"sex": "male"} {"name": "andy","age": 30,"sex": "male"} {"name": "justin","age": 19,"sex": "male"} {"name": "lisi","age": 18,"sex": "male"} {"name": "zs","age": 10,"sex": "female"} {"name": "zhiling","age": 40,"sex": "female"}
(3)运行结果如下:
全部评论(0)