Spark - SparkSQL使用详解2(Dataset的创建:从RDD、DataFrame、数据库、Hive)
作者:hangge | 2023-11-20 08:54
一、使用集合或数组直接创建 DataSet
1,使用样例
下面代码我们定义了一个包含数据的集合和一个包含列名的数组。最后,我们通过调用 toDS 方法将集合转换为 DataFrame:
import spark.implicits._ 作用是引入 SparkSession 中的隐式转换,具体功能如下:
- 启用隐式转换:SparkSession 对象 spark 是一个特殊的 Spark 入口点,通过导入 spark.implicits._,我们可以将 SparkSession 对象的隐式转换启用。这意味着 Spark 会自动将一些常见的数据类型(如 RDD、Seq 等)转换为 Spark 的 DataFrame 或 Dataset,从而使得对数据的操作更加简洁和方便。
- 提供 toDS 方法:隐式转换使得集合对象(例如 Seq、List)可以使用 toDS 方法,直接将其转换为 Dataset。这样就不需要手动定义 Row 和 Schema,Spark 会自动根据 RDD 的数据类型进行转换。
- 导入 spark.implicits._ 后,可以直接使用 Dataset 的操作,如 map、filter、groupBy 等,而不需要显式导入其他相关的类。
// 定义样例类 case class Person(id:Int, name: String, age: Int) object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") //创建 SparkSession 对象 val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() // 导入隐式转换 import spark.implicits._ // 读取原始数据,创建DataSet val data = Seq(Person(1, "老A", 30), Person(2, "大B", 25), Person(3, "小C", 22)) val ds = data.toDS() // 显示DataSet内容 ds.show() //关闭 Spark spark.stop() } }
2,运行结果
二、通过 RDD 创建 Dataset
1,RDD 转换为 Dataset
(1)下面样例使用 toDS 方法将一个已有的 RDD 转换为 DataFrame,我们需要定义一个 case class 来表示数据的结构:
// 定义样例类 case class Person(id:Int, name: String, age: Int) object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") //创建 SparkSession 对象 val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() // 导入隐式转换 import spark.implicits._ // 创建RDD(通过集合) val rdd = spark.sparkContext.makeRDD(Array( (1, "老A", 30), (2, "大B", 25), (3, "小C", 22))) // 将RDD转换为Dataset val ds: Dataset[Person] = rdd.map { case (id, name, age) => Person(id, name, age) }.toDS() // 显示Dataset内容 ds.show() //关闭 Spark spark.stop() } }
(2)运行结果如下:
2,Dataset 转换为 RDD
(1)可以使用 rdd 方法将 Dataset 转换为 RDD:
// 定义样例类 case class Person(id:Int, name: String, age: Int) object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") //创建 SparkSession 对象 val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() // 导入隐式转换 import spark.implicits._ // 创建RDD(通过集合) val rdd = spark.sparkContext.makeRDD(Array( (1, "老A", 30), (2, "大B", 25), (3, "小C", 22))) // 将RDD转换为Dataset val ds: Dataset[Person] = rdd.map { case (id, name, age) => Person(id, name, age) }.toDS() // 将Dataset转换为RDD var rdd2: RDD[Person] = ds.rdd // 显示RDD val array:Array[Person] = rdd2.collect() array.foreach(println) //关闭 Spark spark.stop() } }
(2)运行结果如下:
三、通过 DataFrame 创建 Dataset
1,DataFrame 转换为 Dataset
(1)我们可以使用 as 方法将一个已有的 DataFrame 转换为 Dataset:
// 定义样例类 case class Person(id:Int, name: String, age: Int) object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") //创建 SparkSession 对象 val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() // 导入隐式转换 import spark.implicits._ // 使用集合创建DataFrame val data = Seq((1, "老A", 30), (2, "大B", 25), (3, "小C", 22)) val columns = Seq("id", "name", "age") // 将集合转换为DataFrame val df = data.toDF(columns: _*) // 将DataFrame转换为Dataset val ds: Dataset[Person] = df.as[Person] // 显示Dataset内容 ds.show() //关闭 Spark spark.stop() } }
(2)运行结果如下:
2,Dataset 转换为 DataFrame
(1)要将 Dataset 转换为 DataFrame,可以直接使用 toDF 方法,它会将 Dataset 转换为 DataFrame,并且保留数据的结构和类型信息:// 定义样例类 case class Person(id:Int, name: String, age: Int) object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") //创建 SparkSession 对象 val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() // 导入隐式转换 import spark.implicits._ // 读取原始数据,创建DataSet val data = Seq(Person(1, "老A", 30), Person(2, "大B", 25), Person(3, "小C", 22)) val ds = data.toDS() // 将Dataset转换为DataFrame val df = ds.toDF() // 显示DataFrame内容 df.show() //关闭 Spark spark.stop() } }
(2)运行结果如下:
四、读取文件数据并创建 Dataset
1,读取 json 文件创建 Dataset
(1)假设我们项目中有个 json 文件,内容如下:
{"username":"hangge","age":20} {"username":"xiaoli","age":11} {"username":"hutu","age":33}
(2)下面代码我们将该文件加载到 DataFrame,然后通过 as 方法转换成 Dataset:
// 定义样例类 case class Person(username: String, age: BigInt) // 读取 json 文件创建 Dataset val ds = spark.read.json("datas/user.json").as[Person] ds.show()
(3)我们也可以将 DataFrame 保存为 json 文件:
// 读取 json 文件创建 Dataset val ds = spark.read.json("datas/user.json").as[Person] ds.show() // 将 Dataset 保存到json文件 ds.write.json("output")
2,读取 txt 文件创建 DataFrame
(1)假设我们项目中有个 txt 文件,内容如下:
welcome hangge.com 航歌 china
(2)下面代码我们将该文件加载到 Dataset,读取到的 Dataset 只会有一个列,列名默认为 value:
// 读取 txt 文件创建 Dataset val ds = spark.read.textFile("datas/word.txt") ds.show()
(3)我们也可以将 Dataset 保存为 txt 文件:
// 读取 txt 文件创建 Dataset val ds = spark.read.textFile("datas/word.txt") ds.show() // 将 Dataset 保存到 txt 文件 ds.write.text("output")
3,读取 csv 文件创建 Dataset
(1)假设我们项目中有个 csv 文件,内容如下:
name,age hangge,20 xiaoli,11 hutu,33
(2)下面代码我们将该文件加载到 Dataset:
// 定义样例类 case class Person(name: String, age: BigInt) // 读取 csv 文件创建 Dataset val ds = spark.read.option("sep", ",") // 列分隔符 .option("inferSchema", "true") .option("header", "true") // 是否有CSV标头 //.schema("name STRING, age INT") // 指定列名和类型 .csv("datas/user.csv") .as[Person] ds.show()
(3)我们也可以将 Dataset 保存为 csv 文件:
// 读取 csv 文件创建 Dataset val ds = spark.read.option("sep", ",") // 列分隔符 .option("inferSchema", "true") .option("header", "true") // 是否有CSV标头 //.schema("name STRING, age INT") // 指定列名和类型 .csv("datas/user.csv") .as[Person] ds.show() // 将 Dataset 保存到 csv 文件 ds.write.csv("output")
4,读取 Parquet 文件创建 Dataset
(1)Spark SQL 的默认数据源为 Parquet 格式。Parquet 是一种能够有效存储嵌套数据的列式存储格式。下面代码我们将 Parquet 文件加载到 Dataset:
parquet 对比普通的文本文件的区别:
- parquet 内置 schema(列名\ 列类型\ 是否为空)。
- 存储是以列作为存储格式。
- 存储是序列化存储在文件中的(有压缩属性体积小)。
// 读取 parquet 文件创建 Dataset val ds = spark.read.parquet("datas/user.parquet").as[Person] ds.show()
(2)下面代码将 Dataset 写入到 Parquet 文件中:
// 将 Dataset 保存到 parquet 文件 ds.write.parquet("output")
(3)Parquet 文件不能直接打开查看,如果想要查看内容,可以“Avro and Parquet View”这个插件来查看,安装后 IDEA 底部会有个“Avro/Parquet View”按钮,点击后将 Parquet 文件拖入到打开的窗口即可进行查看:
四、读取数据库数据并创建 Dataset
1,准备工作
(1)Spark SQL 可以通过 JDBC 从关系型数据库中读取数据的方式创建 DataFrame 然后转换成 Dataset,通过对 Dataset 一系列的计算后,还可以将数据再写回关系型数据库中。假设 MySQL 数据库中有张 user_info 表,表中有如下数据:
(2)首先我们需要编辑项目的 pom.xml 文件,添加数据库驱动依赖:
<!-- 数据库驱动依赖 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.33</version> </dependency>
2,读取数据
(1)下面样例中我们使用了 3 种方式,从数据库中读取表数据并创建 Dataset:
// 定义样例类 case class Person(id: Int, user_name: String, pass_word: String, age: Int) // 方式1:通用的 load 方法读取 val ds1 = spark.read.format("jdbc") .option("url", "jdbc:mysql://192.168.43.96:3306/hangge") .option("driver", "com.mysql.jdbc.Driver") .option("user", "root") .option("password", "hangge1234") .option("dbtable", "user_info") .load() .as[Person] // 显示Dataset内容 ds1.show() // 方式2:通用的 load 方法读取(另一种形式参数) val ds2 = spark.read.format("jdbc") .options(Map( "url"->"jdbc:mysql://192.168.43.96:3306/hangge?user=root&password=hangge1234", "dbtable"->"user_info", "driver"->"com.mysql.jdbc.Driver")) .load() .as[Person] // 显示Dataset内容 ds2.show() // 方式3:使用 jdbc 方法读取 val props: Properties = new Properties() props.setProperty("user", "root") props.setProperty("password", "hangge1234") val ds3 = spark.read.jdbc("jdbc:mysql://192.168.43.96:3306/hangge", "user_info", props).as[Person] // 显示Dataset内容 ds3.show()
3,写入数据
(1)下面样例中我们使用了 2 种方式将 Dataset 数据写入到数据库表中:
// 定义样例类 case class Person(id: Int, user_name: String, pass_word: String, age: Int) // 读取原始数据,创建DataFrame val data1 = Seq(Person(3, "小C", "c0c0", 33)) val data2 = Seq(Person(4, "小D", "d0d0", 3)) val ds1 = data1.toDS() val ds2 = data2.toDS() //方式 1:通用的方式 format 指定写出类型 ds1.write .format("jdbc") .option("url", "jdbc:mysql://192.168.43.96:3306/hangge") .option("user", "root") .option("password", "hangge1234") .option("dbtable", "user_info") .mode(SaveMode.Append) .save() //方式 2:通过 jdbc 方法 val props: Properties = new Properties() props.setProperty("user", "root") props.setProperty("password", "hangge1234") ds2.write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.43.96:3306/hangge", "user_info", props)
(2)执行后查看数据库可以看到数据已经成功插入到表中:
五、通过 Hive 创建 Dataset
1,对 Hive 的支持
Spark 支持两种方式来使用 Hive:内置的 Hive 和外部的 Hive。这两种方式在数据处理和元数据存储方面有所不同。
- 内置的 Hive:指的是在 Spark 应用程序中直接使用内置的 Hive Metastore 来存储表的元数据信息,而不依赖外部的 Hive 服务。该方式时候日常的开发学习。
- 外部的 Hive:指的是 Spark 连接到一个独立的外部 Hive Metastore 服务,该服务可能是在独立的 Hive 服务器中或是与 Hadoop 集群中的 Hive Metastore 共享。在这种方式下,Spark 只使用外部的 Hive Metastore 来存储表的元数据信息,数据处理过程与内置的 Hive 没有区别。该方式适用于生产环境。
1,使用样例
(1)下面样例演示如何将 Dataset 内容写入到 Hive 表中,并从 Hive 中读取数据创建 Dataset:
// 定义样例类 case class Person(id:Int, name: String, age: Int) object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") //创建 SparkSession 对象 val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() // 导入隐式转换 import spark.implicits._ // 读取原始数据,创建DataSet val data = Seq(Person(1, "老A", 30), Person(2, "大B", 25), Person(3, "小C", 22)) val ds = data.toDS() // 查看目前所有的Hive表 spark.sql("show tables").show() // 将Dataset内容写入到Hive表中 ds.write.saveAsTable("user_info") // 查看目前所有的Hive表 spark.sql("show tables").show() // 从Hive中读取数据创建DataFrame val hiveDS: Dataset[Person] = spark.table("user_info").as[Person] // 显示Dataset内容 hiveDS.show() //关闭 Spark spark.stop() } }
(2)执行后控制台输出结果如下:
(3)由于我使用的是内置的 Hive,可以看到项目根目录下会出现一个 spark-warehouse 文件夹,该文件夹下的的每个子文件夹是一张表,子文件夹名为表名,里面有 parquet 文件是表的数据文件。
附:SaveMode 介绍与使用
(1)Spark SQL 对于 save 操作,提供了不同的 save mode。 主要用来控制当目标位置已经有数据时应该如何处理。具体模式如下:
- SaveMode.ErrorIfExists(默认):如果目标位置已经存在数据,那么抛出一个异常
- SaveMode.Append:如果目标位置已经存在数据,那么将数据追加进去
- SaveMode.Overwrite:如果目标位置已经存在数据,那么就将已经存在的数据删除,用新数据进行覆盖
- SaveMode.Ignore:如果目标位置已经存在数据,那么就忽略,不做任何操作
注意:save 操作不会执行锁操作,并且也不是原子的,因此是有一定风险出现脏数据的。
(2)下面样例保存数据时,我们将 SaveMode 设置为 Overwrite,如果目标已存在,则覆盖。
import org.apache.spark.SparkConf import org.apache.spark.sql.{SaveMode, SparkSession} // 定义样例类 case class Person(id:Int, name: String, age: Int) object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") //创建 SparkSession 对象 val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() // 导入隐式转换 import spark.implicits._ // 读取原始数据,创建DataSet val data = Seq(Person(1, "老A", 30), Person(2, "大B", 25), Person(3, "小C", 22)) val ds = data.toDS() // 保持数据到文件 ds.write .format("csv") .mode(SaveMode.Overwrite) // 覆盖 .save("hdfs://node1:9000/out-save001") //关闭 Spark spark.stop() } }
全部评论(0)