Spark - Structured Streaming使用详解5(SQL语法、DSL语法)
作者:hangge | 2024-01-15 08:35
从输入源获取数据后,我们就可以在 streaming DataFrames / Datasets上 应用各种操作。操作方式主要分两种:一种是直接执行 sql,另一种则是特定类型的 api(DSL)。下面分别进行介绍。
(2)程序启动后,我们在指定目录下添加一个 people.json 文件,文件内容如下:
(3)接着可以看到控制台输出如下内容:
(2)程序启动后,我们在指定目录下添加一个 people.json 文件,文件内容如下:
(3)运行结果同上面是一样的:
(2)程序启动后,我们在指定目录下添加一个 people.json 文件,文件内容如下:
(3)运行结果如下:
六、SQL 语法
1,基本介绍
- SQL 语法是一种结构化查询语言,用于处理和管理关系型数据。在 Spark SQL 中,我们可以使用标准的 SQL 查询语句来对注册的临时视图或者全局视图进行操作。
- SQL 语法非常直观,对于熟悉 SQL 的用户来说非常友好,尤其是那些习惯使用 SQL 进行数据查询和分析的开发者。
- Spark SQL 的 SQL 语法包括常见的 SQL 查询语句,例如 SELECT、FROM、WHERE、GROUP BY、ORDER BY 等,还支持 JOIN 操作、子查询、聚合函数以及其他高级功能。
2,使用样例
(1)下面代码我们将 DataFrame 注册为临时视图,然后使用 SQL 语法进行数据处理:
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.{LongType, StringType, StructType}
object Hello {
def main(args: Array[String]): Unit = {
// 创建 SparkSession
val spark = SparkSession.builder()
.appName("Hello")
.master("local[*]")
.getOrCreate()
// 导入隐式转换
import spark.implicits._
// 定义 Schema, 用于指定列名以及列中的数据类型
val peopleSchema: StructType = new StructType()
.add("name", StringType)
.add("age", LongType)
.add("sex", StringType)
// 创建一个流式DataFrame,这里从文件系统中读取数据
val peopleDF: DataFrame = spark.readStream
.format("json")
.schema(peopleSchema)
.load("/Volumes/BOOTCAMP/test") // 必须是目录
// 创建临时表
peopleDF.createOrReplaceTempView("people")
// 查询年龄大于20的所有人员信息
val df: DataFrame = spark.sql("select * from people where age > 20")
// 启动查询, 把结果打印到控制台
val query = df.writeStream
.outputMode("append") // 使用append输出模式
.format("console")
.start()
// 等待应用程序终止
query.awaitTermination()
//关闭 Spark
spark.stop()
}
}
(2)程序启动后,我们在指定目录下添加一个 people.json 文件,文件内容如下:
{"name": "hangge","age": 100,"sex": "male"}
{"name": "andy","age": 30,"sex": "male"}
{"name": "justin","age": 19,"sex": "male"}
{"name": "lisi","age": 18,"sex": "male"}
{"name": "zs","age": 10,"sex": "female"}
{"name": "zhiling","age": 40,"sex": "female"}
(3)接着可以看到控制台输出如下内容:

七、DSL 语法
1,基本介绍
- 特定领域语言(domain-specific language,DSL)允许我们使用编程语言(如 Scala、Java、Python)中的方法和函数来操作数据。使用 DSL 语法风格不必去创建临时视图。
- DSL 语法通过函数式编程的方式来构建查询操作,使用方法调用和操作符来代替 SQL 语句,这样的代码更加灵活和易于维护。DSL 语法提供了类型安全、编译时检查和代码提示等优点。
2,DataFrame 结合 DSL 语法的使用(弱类型 api)
(1)下面代码我们使用 DSL 语法对 DataFrame 进行数据处理:
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.{LongType, StringType, StructType}
object Hello {
def main(args: Array[String]): Unit = {
// 创建 SparkSession
val spark = SparkSession.builder()
.appName("Hello")
.master("local[*]")
.getOrCreate()
// 导入隐式转换
import spark.implicits._
// 定义 Schema, 用于指定列名以及列中的数据类型
val peopleSchema: StructType = new StructType()
.add("name", StringType)
.add("age", LongType)
.add("sex", StringType)
// 创建一个流式DataFrame,这里从文件系统中读取数据
val peopleDF: DataFrame = spark.readStream
.format("json")
.schema(peopleSchema)
.load("/Volumes/BOOTCAMP/test") // 必须是目录
// 查询年龄大于20的所有人员信息
val df: DataFrame = peopleDF.select("name","age", "sex").where("age > 20")
// 启动查询, 把结果打印到控制台
val query = df.writeStream
.outputMode("append") // 使用append输出模式
.format("console")
.start()
// 等待应用程序终止
query.awaitTermination()
//关闭 Spark
spark.stop()
}
}
(2)程序启动后,我们在指定目录下添加一个 people.json 文件,文件内容如下:
{"name": "hangge","age": 100,"sex": "male"}
{"name": "andy","age": 30,"sex": "male"}
{"name": "justin","age": 19,"sex": "male"}
{"name": "lisi","age": 18,"sex": "male"}
{"name": "zs","age": 10,"sex": "female"}
{"name": "zhiling","age": 40,"sex": "female"}
(3)运行结果同上面是一样的:

3,Dataset 结合 DSL 语法的使用(强类型 api)
(1)下面代码我们使用 DSL 语法对 Dataset 进行数据处理:
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.types.{LongType, StringType, StructType}
// 定义一个People样例类
case class People(name: String, age: Long, sex: String)
object Hello {
def main(args: Array[String]): Unit = {
// 创建 SparkSession
val spark = SparkSession.builder()
.appName("Hello")
.master("local[*]")
.getOrCreate()
// 导入隐式转换
import spark.implicits._
// 定义 Schema, 用于指定列名以及列中的数据类型
val peopleSchema: StructType = new StructType()
.add("name", StringType)
.add("age", LongType)
.add("sex", StringType)
// 创建一个流式DataFrame,这里从文件系统中读取数据
val peopleDF: DataFrame = spark.readStream
.format("json")
.schema(peopleSchema)
.load("/Volumes/BOOTCAMP/test") // 必须是目录
// 转成 ds
val peopleDS: Dataset[People] = peopleDF.as[People]
// 查询年龄大于20的所有人员信息
val ds: Dataset[String] = peopleDS.filter(_.age > 20).map(_.name)
// 启动查询, 把结果打印到控制台
val query = ds.writeStream
.outputMode("append") // 使用append输出模式
.format("console")
.start()
// 等待应用程序终止
query.awaitTermination()
//关闭 Spark
spark.stop()
}
}
(2)程序启动后,我们在指定目录下添加一个 people.json 文件,文件内容如下:
{"name": "hangge","age": 100,"sex": "male"}
{"name": "andy","age": 30,"sex": "male"}
{"name": "justin","age": 19,"sex": "male"}
{"name": "lisi","age": 18,"sex": "male"}
{"name": "zs","age": 10,"sex": "female"}
{"name": "zhiling","age": 40,"sex": "female"}
(3)运行结果如下:
全部评论(0)