Spark - SparkSQL使用详解3(SQL语法、DSL语法)
作者:hangge | 2023-11-22 08:54
一、SQL 语法
1,基本介绍
- SQL 语法是一种结构化查询语言,用于处理和管理关系型数据。在 Spark SQL 中,我们可以使用标准的 SQL 查询语句来对注册的临时视图或者全局视图进行操作。
- SQL 语法非常直观,对于熟悉 SQL 的用户来说非常友好,尤其是那些习惯使用 SQL 进行数据查询和分析的开发者。
- Spark SQL 的 SQL 语法包括常见的 SQL 查询语句,例如 SELECT、FROM、WHERE、GROUP BY、ORDER BY 等,还支持 JOIN 操作、子查询、聚合函数以及其他高级功能。
2,DataFrame 结合 SQL 语法的使用
(1)下面代码我们将 DataFrame 注册为临时视图,然后使用 SQL 语法进行数据处理:
object Hello {
def main(args: Array[String]): Unit = {
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
//创建 SparkSession 对象
val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
// 导入隐式转换
import spark.implicits._
// 读取原始数据,创建DataFrame
val data = Seq(
(1, "老A", 30, 1),
(2, "大B", 25, 0),
(3, "小C", 15, 0)
)
val df:DataFrame = data.toDF("id", "name", "age", "sex")
// 注册DataFrame为临时视图
df.createOrReplaceTempView("people")
// 使用SELECT语句选择name列
val result1: DataFrame = spark.sql("SELECT name FROM people")
result1.show()
// 使用WHERE语句过滤年龄小于等于25岁的人
val result2: DataFrame = spark.sql("SELECT * FROM people WHERE age <= 25")
result2.show()
// 使用GROUP BY语句按照sex列对数据进行分组,并计算每个组的平均年龄
val result3: DataFrame = spark.sql("SELECT sex, AVG(age) as avg_age FROM people GROUP BY sex")
result3.show()
// 使用ORDER BY语句按照age列对数据进行升序排序
val result4: DataFrame = spark.sql("SELECT name, age FROM people ORDER BY age")
result4.show()
// 创建另一个DataFrame
val addressData = Seq(
(1, "苏州"),
(2, "无锡"),
(3, "广州")
)
val addressDF: DataFrame = addressData.toDF("id", "city")
// 注册另一个DataFrame为临时视图
addressDF.createOrReplaceTempView("addresses")
// 使用JOIN语句将people表和addresses表连接在一起
val result5: DataFrame =
spark.sql("SELECT p.name, p.age, a.city FROM people p JOIN addresses a ON p.id = a.id")
result5.show()
//关闭 Spark
spark.stop()
}
}
(2)运行结果如下:

3,Dataset 结合 SQL 语法的使用
(1)下面代码我们将 Dataset 注册为临时视图,然后使用 SQL 语法进行数据处理:// 定义样例类
case class Person(id: Int, name: String, age: Int, sex: Int)
case class Address(id: Int, city: String)
object Hello {
def main(args: Array[String]): Unit = {
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
//创建 SparkSession 对象
val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
// 导入隐式转换
import spark.implicits._
// 读取原始数据,创建Dataset
val data = Seq(
Person(1, "老A", 30, 1),
Person(2, "大B", 25, 0),
Person(3, "小C", 15, 0)
)
val ds:Dataset[Person] = data.toDS()
// 注册Dataset为临时视图
ds.createOrReplaceTempView("people")
// 使用SELECT语句选择name列
val result1: Dataset[String] = spark.sql("SELECT name FROM people").as[String]
result1.show()
// 使用WHERE语句过滤年龄小于等于25岁的人
val result2: Dataset[Person] = spark.sql("SELECT * FROM people WHERE age <= 25").as[Person]
result2.show()
// 使用GROUP BY语句按照sex列对数据进行分组,并计算每个组的平均年龄
val result3: Dataset[(Int, Double)] = spark
.sql("SELECT sex, AVG(age) as avg_age FROM people GROUP BY sex")
.as[(Int, Double)]
result3.show()
// 使用ORDER BY语句按照age列对数据进行升序排序
val result4: Dataset[Person] = spark.sql("SELECT * FROM people ORDER BY age").as[Person]
result4.show()
// 创建另一个Dataset
val addressData = Seq(
Address(1, "苏州"),
Address(2, "无锡"),
Address(3, "广州")
)
val addressDS: Dataset[Address] = addressData.toDS()
// 注册另一个DataFrame为临时视图
addressDS.createOrReplaceTempView("addresses")
// 使用JOIN语句将people表和addresses表连接在一起
val joinedResult: DataFrame =
spark.sql("SELECT p.name, p.age, a.city FROM people p JOIN addresses a ON p.id = a.id")
joinedResult.show()
//关闭 Spark
spark.stop()
}
}
(2)运行结果如下:
二、DSL 语法
1,基本介绍
- 特定领域语言(domain-specific language, DSL)允许我们使用编程语言(如 Scala、Java、Python)中的方法和函数来操作数据。使用 DSL 语法风格不必去创建临时视图。
- DSL 语法通过函数式编程的方式来构建查询操作,使用方法调用和操作符来代替 SQL 语句,这样的代码更加灵活和易于维护。DSL 语法提供了类型安全、编译时检查和代码提示等优点。
2,DataFrame 结合 DSL 语法的使用
(1)下面代码我们使用 DSL 语法对 DataFrame 进行数据处理:
object Hello {
def main(args: Array[String]): Unit = {
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
//创建 SparkSession 对象
val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
// 导入隐式转换
import spark.implicits._
// 读取原始数据,创建DataFrame
val data = Seq(
(1, "老A", 30, 1),
(2, "大B", 25, 0),
(3, "小C", 15, 0)
)
val df:DataFrame = data.toDF("id", "name", "age", "sex")
// 使用select()选择name列
val result1: DataFrame = df.select($"name")
result1.show()
// 使用filter()过滤年龄小于等于25岁的人
val result2: DataFrame = df.filter($"age" <= 25)
result2.show()
// 使用groupBy()按照sex列进行分组,并计算每个组的平均年龄
val result3: DataFrame = df.groupBy($"sex").agg(avg($"age").alias("avg_age"))
result3.show()
// 使用orderBy()按照age列进行升序排序
val result4: DataFrame = df.orderBy($"age")
result4.show()
// 创建另一个DataFrame
val addressData = Seq(
(1, "苏州"),
(2, "无锡"),
(3, "广州")
)
val addressDF: DataFrame = addressData.toDF("id", "city")
// 使用DSL语法进行JOIN操作
val result5: DataFrame = df.join(addressDF, df("id") === addressDF("id"))
.select(df("name"), df("age"), addressDF("city"))
result5.show()
//关闭 Spark
spark.stop()
}
}
(2)运行结果如下,可以看到效果和使用 SQL 语法是一样的:

3,Dataset 结合 DSL 语法的使用
(1)下面代码我们使用 DSL 语法对 Dataset 进行数据处理:
// 定义样例类
case class Person(id: Int, name: String, age: Int, sex: Int)
case class Address(id: Int, city: String)
object Hello {
def main(args: Array[String]): Unit = {
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
//创建 SparkSession 对象
val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
// 导入隐式转换
import spark.implicits._
// 读取原始数据,创建Dataset
val data = Seq(
Person(1, "老A", 30, 1),
Person(2, "大B", 25, 0),
Person(3, "小C", 15, 0)
)
val ds:Dataset[Person] = data.toDS()
// 使用select()选择name列
val result1: Dataset[String] = ds.select($"name").as[String]
result1.show()
// 使用filter()过滤年龄小于等于25岁的人
val result2: Dataset[Person] = ds.filter($"age" <= 25)
result2.show()
// 使用groupBy()按照sex列进行分组,并计算每个组的平均年龄
val result3: Dataset[(Int, Double)] = ds.groupBy($"sex").agg(avg($"age").as("avg_age"))
.as[(Int, Double)]
result3.show()
// 使用orderBy()按照age列进行升序排序
val result4: Dataset[Person] = ds.orderBy($"age")
result4.show()
// 创建另一个Dataset
val addressData = Seq(
Address(1, "苏州"),
Address(2, "无锡"),
Address(3, "广州")
)
val addressDS: Dataset[Address] = addressData.toDS()
// 使用DSL语法进行JOIN操作,并转换为Dataset
val result5: Dataset[(String, Int, String)] = ds
.joinWith(addressDS, ds("id") === addressDS("id"))
.map { case (person, address) => (person.name, person.age, address.city) }
result5.show()
//关闭 Spark
spark.stop()
}
}
(2)运行结果如下:
附:内置函数汇总
Spark 中提供了很多内置的函数,其实这些函数和 hive 中的函数是类似的,具体如下:
| 种类 | 函数 |
| 聚合函数 | avg、count、countDistinct、first、last、max、mean、min、sum、sumDistinct |
| 集合函数 | array_contains、explode、size |
| 日期/时间函数 | datediff、date_add、date_sub、add_months、last_day、next_day、months_between、current_date、current_timestamp、date_format |
| 数学函数 | abs、ceil、floor、round |
| 混合函数 | if、isnull、md5、not、rand、when |
| 字符串函数 | concat、get_json_object、length、reverse、split、upper |
| 窗口函数 | denseRank、rank、rowNumber |
全部评论(0)