Spark - SparkSQL使用详解1(DataFrame的创建:从RDD、文件、数据库、Hive)
作者:hangge | 2023-11-17 09:11
一、使用集合或数组直接创建 DataFrame
1,使用样例
下面代码我们定义了一个包含数据的集合和一个包含列名的数组。最后,我们通过调用 toDF 方法将集合转换为 DataFrame:
import spark.implicits._ 作用是引入 SparkSession 中的隐式转换,具体功能如下:
- 启用隐式转换:SparkSession 对象 spark 是一个特殊的 Spark 入口点,通过导入 spark.implicits._,我们可以将 SparkSession 对象的隐式转换启用。这意味着 Spark 会自动将一些常见的数据类型(如 RDD、Seq 等)转换为 Spark 的 DataFrame 或 Dataset,从而使得对数据的操作更加简洁和方便。
- 提供 toDF 方法:隐式转换使得集合对象(例如 Seq、List)可以使用 toDF 方法,直接将其转换为 DataFrame。这样,我们就可以直接从集合创建 DataFrame,而无需手动创建 Row 和 Schema。
- 启用其它 DataFrame 操作:通过隐式转换,我们可以使用一些方便的 DataFrame 操作,例如 select、filter、groupBy 等,而无需显示地导入其他相关的类。
object Hello {
def main(args: Array[String]): Unit = {
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
//创建 SparkSession 对象
val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
// 导入隐式转换,以便可以使用toDF方法将Seq转换为DataFrame
import spark.implicits._
// 使用集合创建DataFrame
val data = Seq(("Alice", 30), ("Bob", 25), ("Charlie", 22))
val columns = Seq("Name", "Age")
// 将集合转换为DataFrame
val df = data.toDF(columns: _*)
// 显示DataFrame内容
df.show()
//关闭 Spark
spark.stop()
}
}
2,运行结果

二、通过 RDD 创建 DataFrame
1,RDD 转换为 DataFrame
(1)下面样例类将 RDD 转换为 DataFrame:
object Hello {
def main(args: Array[String]): Unit = {
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
//创建 SparkSession 对象
val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
//RDD=>DataFrame=>DataSet 转换需要引入隐式转换规则,否则无法转换
//spark 不是包名,是上下文环境对象名
import spark.implicits._
// 创建RDD(通过集合)
val rdd = spark.sparkContext.makeRDD(Array(
(1, "Alice"),
(2, "Bob"),
(3, "Charlie"),
(4, "David"),
(5, "Eva")))
// 将RDD转换为DataFrame
val df = rdd.toDF("id", "name")
// 显示 DataFrame
df.show()
//关闭 Spark
spark.stop()
}
}
(2)运行结果如下:

2,DataFrame 转换为 RDD
(1)DataFrame 其实就是对 RDD 的封装,所以可以直接获取内部的 RDD,下面样例将 DataFrame 转换为 RDD:

object Hello {
def main(args: Array[String]): Unit = {
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
//创建 SparkSession 对象
val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
//RDD=>DataFrame=>DataSet 转换需要引入隐式转换规则,否则无法转换
//spark 不是包名,是上下文环境对象名
import spark.implicits._
// 创建RDD(通过集合)
val rdd = spark.sparkContext.makeRDD(Array(
(1, "Alice"),
(2, "Bob"),
(3, "Charlie"),
(4, "David"),
(5, "Eva")))
// 将RDD转换为DataFrame
val df = rdd.toDF("id", "name")
// 将DataFrame转换为RDD
var rdd2 = df.rdd
// 显示RDD
val array:Array[org.apache.spark.sql.Row] = rdd2.collect()
println(array(0))
println(array(0)(0))
println(array(0).getAs[String]("name"))
//关闭 Spark
spark.stop()
}
}
(2)运行结果如下:

三、读取文件数据并创建 DataFrame
1,读取 json 文件创建 DataFrame
(1)假设我们项目中有个 json 文件,内容如下:
{"username":"hangge","age":20}
{"username":"xiaoli","age":11}
{"username":"hutu","age":33}
(2)下面代码我们将该文件加载到 DataFrame:
// 读取 json 文件创建 DataFrame
val df = spark.read.json("datas/user.json")
df.show()
(3)我们也可以将 DataFrame 保存为 json 文件:
// 读取 json 文件创建 DataFrame
val df = spark.read.json("datas/user.json")
df.show()
// 将 DataFrame 保存到json文件
df.write.json("output")
2,读取 txt 文件创建 DataFrame
(1)假设我们项目中有个 txt 文件,内容如下:
welcome hangge.com 航歌 china
(2)下面代码我们将该文件加载到 DataFrame,读取到的 DataFrame 只会有一个列,列名默认称之为 value :
// 读取 txt 文件创建 DataFrame
val df = spark.read.text("datas/word.txt")
df.show()
(3)我们也可以将 DataFrame 保存为 txt 文件:
// 读取 txt 文件创建 DataFrame
val df = spark.read.text("datas/word.txt")
df.show()
// 将 DataFrame 保存到 txt 文件
df.write.text("output")
3,读取 csv 文件创建 DataFrame
(1)假设我们项目中有个 csv 文件,内容如下:
name,age hangge,20 xiaoli,11 hutu,33
(2)下面代码我们将该文件加载到 DataFrame:
// 读取 csv 文件创建 DataFrame
val df = spark.read.option("sep", ",") // 列分隔符
.option("inferSchema", "true")
.option("header", "true") // 是否有CSV标头
//.schema("name STRING, age INT") // 指定列名和类型
.csv("datas/user.csv")
df.show()
(3)我们也可以将 DataFrame 保存为 csv 文件:
// 读取 csv 文件创建 DataFrame
val df = spark.read.option("sep", ",") // 列分隔符
.option("inferSchema", "true")
.option("header", "true") // 是否有CSV标头
//.schema("name STRING, age INT") // 指定列名和类型
.csv("datas/user.csv")
df.show()
// 将 DataFrame 保存到 csv 文件
df.write.csv("output")
4,读取 Parquet 文件创建 DataFrame
(1)Spark SQL 的默认数据源为 Parquet 格式。Parquet 是一种能够有效存储嵌套数据的列式存储格式。下面代码我们将 Parquet 文件加载到 DataFrame:
parquet 对比普通的文本文件的区别:
- parquet 内置 schema(列名 \ 列类型 \ 是否为空)。
- 存储是以列作为存储格式。
- 存储是序列化存储在文件中的(有压缩属性体积小)。
// 读取 parquet 文件创建 DataFrame
val df = spark.read.parquet("datas/user.parquet")
df.show()
(2)下面代码将 DataFrame 写入到 Parquet 文件中:
// 将 DataFrame 保存到 parquet 文件
df.write.parquet("output")
(3)Parquet 文件不能直接打开查看,如果想要查看内容,可以“Avro and Parquet View”这个插件来查看,安装后 IDEA 底部会有个“Avro/Parquet View”按钮,点击后将 Parquet 文件拖入到打开的窗口即可进行查看:

四、读取数据库数据并创建 DataFrame
1,准备工作
(1)Spark SQL 可以通过 JDBC 从关系型数据库中读取数据的方式创建 DataFrame,通过对 DataFrame 一系列的计算后,还可以将数据再写回关系型数据库中。假设 MySQL 数据库中有张 user_info 表,表中有如下数据:

(2)首先我们需要编辑项目的 pom.xml 文件,添加数据库驱动依赖:
<!-- 数据库驱动依赖 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
2,读取数据
(1)下面样例中我们使用了 3 种方式,从数据库中读取表数据并创建 DataFrame:
// 方式1:通用的 load 方法读取
val df1 = spark.read.format("jdbc")
.option("url", "jdbc:mysql://192.168.43.96:3306/hangge")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "hangge1234")
.option("dbtable", "user_info")
.load()
// 显示DataFrame内容
df1.show()
// 方式2:通用的 load 方法读取(另一种形式参数)
val df2 = spark.read.format("jdbc")
.options(Map(
"url"->"jdbc:mysql://192.168.43.96:3306/hangge?user=root&password=hangge1234",
"dbtable"->"user_info",
"driver"->"com.mysql.jdbc.Driver"))
.load()
// 显示DataFrame内容
df2.show()
// 方式3:使用 jdbc 方法读取
val props: Properties = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "hangge1234")
val df3 = spark.read.jdbc("jdbc:mysql://192.168.43.96:3306/hangge", "user_info", props)
// 显示DataFrame内容
df3.show()
(2)运行结果如下:

3,写入数据
(1)下面样例中我们使用了 2 种方式将 DataFrame 数据写入到数据库表中:
// 读取原始数据,创建DataFrame
val data1 = Seq((3, "小C", "c0c0", 33))
val data2 = Seq((4, "小D", "d0d0", 3))
val columns = Seq("id", "user_name", "pass_word", "age")
val df1 = data1.toDF(columns: _*)
val df2 = data2.toDF(columns: _*)
//方式 1:通用的方式 format 指定写出类型
df1.write
.format("jdbc")
.option("url", "jdbc:mysql://192.168.43.96:3306/hangge")
.option("user", "root")
.option("password", "hangge1234")
.option("dbtable", "user_info")
.mode(SaveMode.Append)
.save()
//方式 2:通过 jdbc 方法
val props: Properties = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "hangge1234")
df2.write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.43.96:3306/hangge", "user_info", props)
(2)执行后查看数据库可以看到数据已经成功插入到表中:

五、通过 Hive 创建 DataFrame
1,对 Hive 的支持
Spark 支持两种方式来使用 Hive:内置的 Hive 和外部的 Hive。这两种方式在数据处理和元数据存储方面有所不同。
- 内置的 Hive:指的是在 Spark 应用程序中直接使用内置的 Hive Metastore 来存储表的元数据信息,而不依赖外部的 Hive 服务。该方式时候日常的开发学习。
- 外部的 Hive:指的是 Spark 连接到一个独立的外部 Hive Metastore 服务,该服务可能是在独立的 Hive 服务器中或是与 Hadoop 集群中的 Hive Metastore 共享。在这种方式下,Spark 只使用外部的 Hive Metastore 来存储表的元数据信息,数据处理过程与内置的 Hive 没有区别。该方式适用于生产环境。
2,使用样例
(1)下面样例演示如何将 DataFrame 内容写入到 Hive 表中,并从 Hive 中读取数据创建 DataFrame:
object Hello {
def main(args: Array[String]): Unit = {
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
//创建 SparkSession 对象
val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
// 导入隐式转换
import spark.implicits._
// 读取原始数据,创建DataFrame
val data = Seq((1, "老A", 30), (2, "大B", 25), (3, "小C", 22))
val columns = Seq("id", "name", "age")
val df = data.toDF(columns: _*)
// 查看目前所有的Hive表
spark.sql("show tables").show()
// 将DataFrame内容写入到Hive表中
df.write.saveAsTable("user_info")
// 查看目前所有的Hive表
spark.sql("show tables").show()
// 从Hive中读取数据创建DataFrame
val hiveDF = spark.table("user_info")
// 显示DataFrame内容
hiveDF.show()
//关闭 Spark
spark.stop()
}
}
(2)执行后控制台输出结果如下:

(3)由于我使用的是内置的 Hive,可以看到项目根目录下会出现一个 spark-warehouse 文件夹,该文件夹下的的每个子文件夹是一张表,子文件夹名为表名,里面有 parquet 文件是表的数据文件。
附:SaveMode 介绍与使用
(1)Spark SQL 对于 save 操作,提供了不同的 save mode。 主要用来控制当目标位置已经有数据时应该如何处理。具体模式如下:
- SaveMode.ErrorIfExists(默认):如果目标位置已经存在数据,那么抛出一个异常
- SaveMode.Append:如果目标位置已经存在数据,那么将数据追加进去
- SaveMode.Overwrite:如果目标位置已经存在数据,那么就将已经存在的数据删除,用新数据进行覆盖
- SaveMode.Ignore:如果目标位置已经存在数据,那么就忽略,不做任何操作
注意:save 操作不会执行锁操作,并且也不是原子的,因此是有一定风险出现脏数据的。
(2)下面样例保存数据时,我们将 SaveMode 设置为 Overwrite,如果目标已存在,则覆盖。
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
object Hello {
def main(args: Array[String]): Unit = {
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
//创建 SparkSession 对象
val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
// 导入隐式转换,以便可以使用toDF方法将Seq转换为DataFrame
import spark.implicits._
// 使用集合创建DataFrame
val data = Seq(("Alice", 30), ("Bob", 25), ("Charlie", 22))
val columns = Seq("Name", "Age")
// 将集合转换为DataFrame
val df = data.toDF(columns: _*)
// 保持数据到文件
df.write
.format("csv")
.mode(SaveMode.Overwrite) // 覆盖
.save("hdfs://node1:9000/out-save001")
//关闭 Spark
spark.stop()
}
}
全部评论(0)