Spark - SparkSQL使用详解3(SQL语法、DSL语法)
作者:hangge | 2023-11-22 08:54
一、SQL 语法
1,基本介绍
- SQL 语法是一种结构化查询语言,用于处理和管理关系型数据。在 Spark SQL 中,我们可以使用标准的 SQL 查询语句来对注册的临时视图或者全局视图进行操作。
- SQL 语法非常直观,对于熟悉 SQL 的用户来说非常友好,尤其是那些习惯使用 SQL 进行数据查询和分析的开发者。
- Spark SQL 的 SQL 语法包括常见的 SQL 查询语句,例如 SELECT、FROM、WHERE、GROUP BY、ORDER BY 等,还支持 JOIN 操作、子查询、聚合函数以及其他高级功能。
2,DataFrame 结合 SQL 语法的使用
(1)下面代码我们将 DataFrame 注册为临时视图,然后使用 SQL 语法进行数据处理:
object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") //创建 SparkSession 对象 val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() // 导入隐式转换 import spark.implicits._ // 读取原始数据,创建DataFrame val data = Seq( (1, "老A", 30, 1), (2, "大B", 25, 0), (3, "小C", 15, 0) ) val df:DataFrame = data.toDF("id", "name", "age", "sex") // 注册DataFrame为临时视图 df.createOrReplaceTempView("people") // 使用SELECT语句选择name列 val result1: DataFrame = spark.sql("SELECT name FROM people") result1.show() // 使用WHERE语句过滤年龄小于等于25岁的人 val result2: DataFrame = spark.sql("SELECT * FROM people WHERE age <= 25") result2.show() // 使用GROUP BY语句按照sex列对数据进行分组,并计算每个组的平均年龄 val result3: DataFrame = spark.sql("SELECT sex, AVG(age) as avg_age FROM people GROUP BY sex") result3.show() // 使用ORDER BY语句按照age列对数据进行升序排序 val result4: DataFrame = spark.sql("SELECT name, age FROM people ORDER BY age") result4.show() // 创建另一个DataFrame val addressData = Seq( (1, "苏州"), (2, "无锡"), (3, "广州") ) val addressDF: DataFrame = addressData.toDF("id", "city") // 注册另一个DataFrame为临时视图 addressDF.createOrReplaceTempView("addresses") // 使用JOIN语句将people表和addresses表连接在一起 val result5: DataFrame = spark.sql("SELECT p.name, p.age, a.city FROM people p JOIN addresses a ON p.id = a.id") result5.show() //关闭 Spark spark.stop() } }
(2)运行结果如下:
3,Dataset 结合 SQL 语法的使用
(1)下面代码我们将 Dataset 注册为临时视图,然后使用 SQL 语法进行数据处理:// 定义样例类 case class Person(id: Int, name: String, age: Int, sex: Int) case class Address(id: Int, city: String) object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") //创建 SparkSession 对象 val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() // 导入隐式转换 import spark.implicits._ // 读取原始数据,创建Dataset val data = Seq( Person(1, "老A", 30, 1), Person(2, "大B", 25, 0), Person(3, "小C", 15, 0) ) val ds:Dataset[Person] = data.toDS() // 注册Dataset为临时视图 ds.createOrReplaceTempView("people") // 使用SELECT语句选择name列 val result1: Dataset[String] = spark.sql("SELECT name FROM people").as[String] result1.show() // 使用WHERE语句过滤年龄小于等于25岁的人 val result2: Dataset[Person] = spark.sql("SELECT * FROM people WHERE age <= 25").as[Person] result2.show() // 使用GROUP BY语句按照sex列对数据进行分组,并计算每个组的平均年龄 val result3: Dataset[(Int, Double)] = spark .sql("SELECT sex, AVG(age) as avg_age FROM people GROUP BY sex") .as[(Int, Double)] result3.show() // 使用ORDER BY语句按照age列对数据进行升序排序 val result4: Dataset[Person] = spark.sql("SELECT * FROM people ORDER BY age").as[Person] result4.show() // 创建另一个Dataset val addressData = Seq( Address(1, "苏州"), Address(2, "无锡"), Address(3, "广州") ) val addressDS: Dataset[Address] = addressData.toDS() // 注册另一个DataFrame为临时视图 addressDS.createOrReplaceTempView("addresses") // 使用JOIN语句将people表和addresses表连接在一起 val joinedResult: DataFrame = spark.sql("SELECT p.name, p.age, a.city FROM people p JOIN addresses a ON p.id = a.id") joinedResult.show() //关闭 Spark spark.stop() } }
(2)运行结果如下:
二、DSL 语法
1,基本介绍
- 特定领域语言(domain-specific language, DSL)允许我们使用编程语言(如 Scala、Java、Python)中的方法和函数来操作数据。使用 DSL 语法风格不必去创建临时视图。
- DSL 语法通过函数式编程的方式来构建查询操作,使用方法调用和操作符来代替 SQL 语句,这样的代码更加灵活和易于维护。DSL 语法提供了类型安全、编译时检查和代码提示等优点。
2,DataFrame 结合 DSL 语法的使用
(1)下面代码我们使用 DSL 语法对 DataFrame 进行数据处理:
object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") //创建 SparkSession 对象 val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() // 导入隐式转换 import spark.implicits._ // 读取原始数据,创建DataFrame val data = Seq( (1, "老A", 30, 1), (2, "大B", 25, 0), (3, "小C", 15, 0) ) val df:DataFrame = data.toDF("id", "name", "age", "sex") // 使用select()选择name列 val result1: DataFrame = df.select($"name") result1.show() // 使用filter()过滤年龄小于等于25岁的人 val result2: DataFrame = df.filter($"age" <= 25) result2.show() // 使用groupBy()按照sex列进行分组,并计算每个组的平均年龄 val result3: DataFrame = df.groupBy($"sex").agg(avg($"age").alias("avg_age")) result3.show() // 使用orderBy()按照age列进行升序排序 val result4: DataFrame = df.orderBy($"age") result4.show() // 创建另一个DataFrame val addressData = Seq( (1, "苏州"), (2, "无锡"), (3, "广州") ) val addressDF: DataFrame = addressData.toDF("id", "city") // 使用DSL语法进行JOIN操作 val result5: DataFrame = df.join(addressDF, df("id") === addressDF("id")) .select(df("name"), df("age"), addressDF("city")) result5.show() //关闭 Spark spark.stop() } }
(2)运行结果如下,可以看到效果和使用 SQL 语法是一样的:
3,Dataset 结合 DSL 语法的使用
(1)下面代码我们使用 DSL 语法对 Dataset 进行数据处理:
// 定义样例类 case class Person(id: Int, name: String, age: Int, sex: Int) case class Address(id: Int, city: String) object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") //创建 SparkSession 对象 val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() // 导入隐式转换 import spark.implicits._ // 读取原始数据,创建Dataset val data = Seq( Person(1, "老A", 30, 1), Person(2, "大B", 25, 0), Person(3, "小C", 15, 0) ) val ds:Dataset[Person] = data.toDS() // 使用select()选择name列 val result1: Dataset[String] = ds.select($"name").as[String] result1.show() // 使用filter()过滤年龄小于等于25岁的人 val result2: Dataset[Person] = ds.filter($"age" <= 25) result2.show() // 使用groupBy()按照sex列进行分组,并计算每个组的平均年龄 val result3: Dataset[(Int, Double)] = ds.groupBy($"sex").agg(avg($"age").as("avg_age")) .as[(Int, Double)] result3.show() // 使用orderBy()按照age列进行升序排序 val result4: Dataset[Person] = ds.orderBy($"age") result4.show() // 创建另一个Dataset val addressData = Seq( Address(1, "苏州"), Address(2, "无锡"), Address(3, "广州") ) val addressDS: Dataset[Address] = addressData.toDS() // 使用DSL语法进行JOIN操作,并转换为Dataset val result5: Dataset[(String, Int, String)] = ds .joinWith(addressDS, ds("id") === addressDS("id")) .map { case (person, address) => (person.name, person.age, address.city) } result5.show() //关闭 Spark spark.stop() } }
(2)运行结果如下:
附:内置函数汇总
Spark 中提供了很多内置的函数,其实这些函数和 hive 中的函数是类似的,具体如下:
种类 | 函数 |
聚合函数 | avg、count、countDistinct、first、last、max、mean、min、sum、sumDistinct |
集合函数 | array_contains、explode、size |
日期/时间函数 | datediff、date_add、date_sub、add_months、last_day、next_day、months_between、current_date、current_timestamp、date_format |
数学函数 | abs、ceil、floor、round |
混合函数 | if、isnull、md5、not、rand、when |
字符串函数 | concat、get_json_object、length、reverse、split、upper |
窗口函数 | denseRank、rank、rowNumber |
全部评论(0)