返回 导航

Spark

hangge.com

Spark - Structured Streaming使用详解5(SQL语法、DSL语法)

作者:hangge | 2024-01-15 08:35
    从输入源获取数据后,我们就可以在 streaming DataFrames Datasets上 应用各种操作。操作方式主要分两种:一种是直接执行 sql,另一种则是特定类型的 apiDSL)。下面分别进行介绍。

六、SQL 语法

1,基本介绍

  • SQL 语法是一种结构化查询语言,用于处理和管理关系型数据。在 Spark SQL 中,我们可以使用标准的 SQL 查询语句来对注册的临时视图或者全局视图进行操作。
  • SQL 语法非常直观,对于熟悉 SQL 的用户来说非常友好,尤其是那些习惯使用 SQL 进行数据查询和分析的开发者。
  • Spark SQL SQL 语法包括常见的 SQL 查询语句,例如 SELECTFROMWHEREGROUP BYORDER BY 等,还支持 JOIN 操作、子查询、聚合函数以及其他高级功能。

2,使用样例

(1)下面代码我们将 DataFrame 注册为临时视图,然后使用 SQL 语法进行数据处理:
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.{LongType, StringType, StructType}

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("Hello")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 定义 Schema, 用于指定列名以及列中的数据类型
    val peopleSchema: StructType = new StructType()
      .add("name", StringType)
      .add("age", LongType)
      .add("sex", StringType)

    // 创建一个流式DataFrame,这里从文件系统中读取数据
    val peopleDF: DataFrame = spark.readStream
      .format("json")
      .schema(peopleSchema)
      .load("/Volumes/BOOTCAMP/test")  // 必须是目录

    // 创建临时表
    peopleDF.createOrReplaceTempView("people")

    // 查询年龄大于20的所有人员信息
    val df: DataFrame = spark.sql("select * from people where age > 20")

    // 启动查询, 把结果打印到控制台
    val query = df.writeStream
      .outputMode("append") // 使用append输出模式
      .format("console")
      .start()

    // 等待应用程序终止
    query.awaitTermination()

    //关闭 Spark
    spark.stop()
  }
}

(2)程序启动后,我们在指定目录下添加一个 people.json 文件,文件内容如下:
{"name": "hangge","age": 100,"sex": "male"}
{"name": "andy","age": 30,"sex": "male"}
{"name": "justin","age": 19,"sex": "male"}
{"name": "lisi","age": 18,"sex": "male"}
{"name": "zs","age": 10,"sex": "female"}
{"name": "zhiling","age": 40,"sex": "female"}

(3)接着可以看到控制台输出如下内容:

七、DSL 语法

1,基本介绍

  • 特定领域语言(domain-specific languageDSL)允许我们使用编程语言(如 ScalaJavaPython)中的方法和函数来操作数据。使用 DSL 语法风格不必去创建临时视图。
  • DSL 语法通过函数式编程的方式来构建查询操作,使用方法调用和操作符来代替 SQL 语句,这样的代码更加灵活和易于维护。DSL 语法提供了类型安全、编译时检查和代码提示等优点。

2,DataFrame 结合 DSL 语法的使用(弱类型 api)

(1)下面代码我们使用 DSL 语法对 DataFrame 进行数据处理:
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.{LongType, StringType, StructType}

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("Hello")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 定义 Schema, 用于指定列名以及列中的数据类型
    val peopleSchema: StructType = new StructType()
      .add("name", StringType)
      .add("age", LongType)
      .add("sex", StringType)

    // 创建一个流式DataFrame,这里从文件系统中读取数据
    val peopleDF: DataFrame = spark.readStream
      .format("json")
      .schema(peopleSchema)
      .load("/Volumes/BOOTCAMP/test")  // 必须是目录

    // 查询年龄大于20的所有人员信息
    val df: DataFrame = peopleDF.select("name","age", "sex").where("age > 20")

    // 启动查询, 把结果打印到控制台
    val query = df.writeStream
      .outputMode("append") // 使用append输出模式
      .format("console")
      .start()

    // 等待应用程序终止
    query.awaitTermination()

    //关闭 Spark
    spark.stop()
  }
}

(2)程序启动后,我们在指定目录下添加一个 people.json 文件,文件内容如下:
{"name": "hangge","age": 100,"sex": "male"}
{"name": "andy","age": 30,"sex": "male"}
{"name": "justin","age": 19,"sex": "male"}
{"name": "lisi","age": 18,"sex": "male"}
{"name": "zs","age": 10,"sex": "female"}
{"name": "zhiling","age": 40,"sex": "female"}

(3)运行结果同上面是一样的:

3,Dataset 结合 DSL 语法的使用(强类型 api)

(1)下面代码我们使用 DSL 语法对 Dataset 进行数据处理:
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.types.{LongType, StringType, StructType}

// 定义一个People样例类
case class People(name: String, age: Long, sex: String)

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("Hello")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 定义 Schema, 用于指定列名以及列中的数据类型
    val peopleSchema: StructType = new StructType()
      .add("name", StringType)
      .add("age", LongType)
      .add("sex", StringType)

    // 创建一个流式DataFrame,这里从文件系统中读取数据
    val peopleDF: DataFrame = spark.readStream
      .format("json")
      .schema(peopleSchema)
      .load("/Volumes/BOOTCAMP/test")  // 必须是目录

    // 转成 ds
    val peopleDS: Dataset[People] = peopleDF.as[People]

    // 查询年龄大于20的所有人员信息
    val ds: Dataset[String] = peopleDS.filter(_.age > 20).map(_.name)

    // 启动查询, 把结果打印到控制台
    val query = ds.writeStream
      .outputMode("append") // 使用append输出模式
      .format("console")
      .start()

    // 等待应用程序终止
    query.awaitTermination()

    //关闭 Spark
    spark.stop()
  }
}

(2)程序启动后,我们在指定目录下添加一个 people.json 文件,文件内容如下:
{"name": "hangge","age": 100,"sex": "male"}
{"name": "andy","age": 30,"sex": "male"}
{"name": "justin","age": 19,"sex": "male"}
{"name": "lisi","age": 18,"sex": "male"}
{"name": "zs","age": 10,"sex": "female"}
{"name": "zhiling","age": 40,"sex": "female"}

(3)运行结果如下:
评论

全部评论(0)

回到顶部