返回 导航

Spark

hangge.com

Spark - SparkSQL使用详解3(SQL语法、DSL语法)

作者:hangge | 2023-11-22 08:54

一、SQL 语法

1,基本介绍

  • SQL 语法是一种结构化查询语言,用于处理和管理关系型数据。在 Spark SQL 中,我们可以使用标准的 SQL 查询语句来对注册的临时视图或者全局视图进行操作。
  • SQL 语法非常直观,对于熟悉 SQL 的用户来说非常友好,尤其是那些习惯使用 SQL 进行数据查询和分析的开发者。
  • Spark SQL SQL 语法包括常见的 SQL 查询语句,例如 SELECTFROMWHEREGROUP BYORDER BY 等,还支持 JOIN 操作、子查询、聚合函数以及其他高级功能。

2,DataFrame 结合 SQL 语法的使用

(1)下面代码我们将 DataFrame 注册为临时视图,然后使用 SQL 语法进行数据处理:
object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
    //创建 SparkSession 对象
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 读取原始数据,创建DataFrame
    val data = Seq(
      (1, "老A", 30, 1),
      (2, "大B", 25, 0),
      (3, "小C", 15, 0)
    )
    val df:DataFrame = data.toDF("id", "name", "age", "sex")

    // 注册DataFrame为临时视图
    df.createOrReplaceTempView("people")

    // 使用SELECT语句选择name列
    val result1: DataFrame = spark.sql("SELECT name FROM people")
    result1.show()

    // 使用WHERE语句过滤年龄小于等于25岁的人
    val result2: DataFrame = spark.sql("SELECT * FROM people WHERE age <= 25")
    result2.show()

    // 使用GROUP BY语句按照sex列对数据进行分组,并计算每个组的平均年龄
    val result3: DataFrame = spark.sql("SELECT sex, AVG(age) as avg_age FROM people GROUP BY sex")
    result3.show()

    // 使用ORDER BY语句按照age列对数据进行升序排序
    val result4: DataFrame = spark.sql("SELECT name, age FROM people ORDER BY age")
    result4.show()

    // 创建另一个DataFrame
    val addressData = Seq(
      (1, "苏州"),
      (2, "无锡"),
      (3, "广州")
    )
    val addressDF: DataFrame = addressData.toDF("id", "city")

    // 注册另一个DataFrame为临时视图
    addressDF.createOrReplaceTempView("addresses")

    // 使用JOIN语句将people表和addresses表连接在一起
    val result5: DataFrame =
      spark.sql("SELECT p.name, p.age, a.city FROM people p JOIN addresses a ON p.id = a.id")
    result5.show()
    
    //关闭 Spark
    spark.stop()
  }
}

(2)运行结果如下:

3,Dataset 结合 SQL 语法的使用

(1)下面代码我们将 Dataset 注册为临时视图,然后使用 SQL 语法进行数据处理:
// 定义样例类
case class Person(id: Int, name: String, age: Int, sex: Int)
case class Address(id: Int, city: String)

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
    //创建 SparkSession 对象
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 读取原始数据,创建Dataset
    val data = Seq(
      Person(1, "老A", 30, 1),
      Person(2, "大B", 25, 0),
      Person(3, "小C", 15, 0)
    )
    val ds:Dataset[Person] = data.toDS()

    // 注册Dataset为临时视图
    ds.createOrReplaceTempView("people")

    // 使用SELECT语句选择name列
    val result1: Dataset[String] = spark.sql("SELECT name FROM people").as[String]
    result1.show()

    // 使用WHERE语句过滤年龄小于等于25岁的人
    val result2: Dataset[Person] = spark.sql("SELECT * FROM people WHERE age <= 25").as[Person]
    result2.show()

    // 使用GROUP BY语句按照sex列对数据进行分组,并计算每个组的平均年龄
    val result3: Dataset[(Int, Double)] = spark
      .sql("SELECT sex, AVG(age) as avg_age FROM people GROUP BY sex")
      .as[(Int, Double)]
    result3.show()

    // 使用ORDER BY语句按照age列对数据进行升序排序
    val result4: Dataset[Person] = spark.sql("SELECT * FROM people ORDER BY age").as[Person]
    result4.show()

    // 创建另一个Dataset
    val addressData = Seq(
      Address(1, "苏州"),
      Address(2, "无锡"),
      Address(3, "广州")
    )
    val addressDS: Dataset[Address] = addressData.toDS()

    // 注册另一个DataFrame为临时视图
    addressDS.createOrReplaceTempView("addresses")

    // 使用JOIN语句将people表和addresses表连接在一起
    val joinedResult: DataFrame =
      spark.sql("SELECT p.name, p.age, a.city FROM people p JOIN addresses a ON p.id = a.id")
    joinedResult.show()

    //关闭 Spark
    spark.stop()
  }
}

(2)运行结果如下:

二、DSL 语法

1,基本介绍

  • 特定领域语言(domain-specific language, DSL)允许我们使用编程语言(如 ScalaJavaPython)中的方法和函数来操作数据。使用 DSL 语法风格不必去创建临时视图。
  • DSL 语法通过函数式编程的方式来构建查询操作,使用方法调用和操作符来代替 SQL 语句,这样的代码更加灵活和易于维护。DSL 语法提供了类型安全、编译时检查和代码提示等优点。

2,DataFrame 结合 DSL 语法的使用

(1)下面代码我们使用 DSL 语法对 DataFrame 进行数据处理:
object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
    //创建 SparkSession 对象
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 读取原始数据,创建DataFrame
    val data = Seq(
      (1, "老A", 30, 1),
      (2, "大B", 25, 0),
      (3, "小C", 15, 0)
    )
    val df:DataFrame = data.toDF("id", "name", "age", "sex")

    // 使用select()选择name列
    val result1: DataFrame = df.select($"name")
    result1.show()

    // 使用filter()过滤年龄小于等于25岁的人
    val result2: DataFrame = df.filter($"age" <= 25)
    result2.show()

    // 使用groupBy()按照sex列进行分组,并计算每个组的平均年龄
    val result3: DataFrame = df.groupBy($"sex").agg(avg($"age").alias("avg_age"))
    result3.show()

    // 使用orderBy()按照age列进行升序排序
    val result4: DataFrame = df.orderBy($"age")
    result4.show()

    // 创建另一个DataFrame
    val addressData = Seq(
      (1, "苏州"),
      (2, "无锡"),
      (3, "广州")
    )
    val addressDF: DataFrame = addressData.toDF("id", "city")

    // 使用DSL语法进行JOIN操作
    val result5: DataFrame = df.join(addressDF, df("id") === addressDF("id"))
      .select(df("name"), df("age"), addressDF("city"))
    result5.show()

    //关闭 Spark
    spark.stop()
  }
}

(2)运行结果如下,可以看到效果和使用 SQL 语法是一样的:

3,Dataset 结合 DSL 语法的使用

(1)下面代码我们使用 DSL 语法对 Dataset 进行数据处理:
// 定义样例类
case class Person(id: Int, name: String, age: Int, sex: Int)
case class Address(id: Int, city: String)

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
    //创建 SparkSession 对象
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 读取原始数据,创建Dataset
    val data = Seq(
      Person(1, "老A", 30, 1),
      Person(2, "大B", 25, 0),
      Person(3, "小C", 15, 0)
    )
    val ds:Dataset[Person] = data.toDS()

    // 使用select()选择name列
    val result1: Dataset[String] = ds.select($"name").as[String]
    result1.show()

    // 使用filter()过滤年龄小于等于25岁的人
    val result2: Dataset[Person] = ds.filter($"age" <= 25)
    result2.show()

    // 使用groupBy()按照sex列进行分组,并计算每个组的平均年龄
    val result3: Dataset[(Int, Double)] = ds.groupBy($"sex").agg(avg($"age").as("avg_age"))
      .as[(Int, Double)]
    result3.show()

    // 使用orderBy()按照age列进行升序排序
    val result4: Dataset[Person] = ds.orderBy($"age")
    result4.show()

    // 创建另一个Dataset
    val addressData = Seq(
      Address(1, "苏州"),
      Address(2, "无锡"),
      Address(3, "广州")
    )
    val addressDS: Dataset[Address] = addressData.toDS()

    // 使用DSL语法进行JOIN操作,并转换为Dataset
    val result5: Dataset[(String, Int, String)] = ds
      .joinWith(addressDS, ds("id") === addressDS("id"))
      .map { case (person, address) => (person.name, person.age, address.city) }
    result5.show()

    //关闭 Spark
    spark.stop()
  }
}

(2)运行结果如下:

附:内置函数汇总

Spark 中提供了很多内置的函数,其实这些函数和 hive 中的函数是类似的,具体如下:
种类 函数
聚合函数 avgcountcountDistinctfirstlastmaxmeanminsumsumDistinct
集合函数 array_containsexplodesize
日期/时间函数 datediffdate_adddate_subadd_monthslast_daynext_daymonths_betweencurrent_datecurrent_timestampdate_format
数学函数 absceilfloorround
混合函数 ifisnullmd5notrandwhen
字符串函数 concatget_json_objectlengthreversesplitupper
窗口函数 denseRankrankrowNumber
评论

全部评论(0)

回到顶部