Spark - SparkSQL使用详解1(DataFrame的创建:从RDD、文件、数据库、Hive)
作者:hangge | 2023-11-17 09:11
一、使用集合或数组直接创建 DataFrame
1,使用样例
下面代码我们定义了一个包含数据的集合和一个包含列名的数组。最后,我们通过调用 toDF 方法将集合转换为 DataFrame:
import spark.implicits._ 作用是引入 SparkSession 中的隐式转换,具体功能如下:
- 启用隐式转换:SparkSession 对象 spark 是一个特殊的 Spark 入口点,通过导入 spark.implicits._,我们可以将 SparkSession 对象的隐式转换启用。这意味着 Spark 会自动将一些常见的数据类型(如 RDD、Seq 等)转换为 Spark 的 DataFrame 或 Dataset,从而使得对数据的操作更加简洁和方便。
- 提供 toDF 方法:隐式转换使得集合对象(例如 Seq、List)可以使用 toDF 方法,直接将其转换为 DataFrame。这样,我们就可以直接从集合创建 DataFrame,而无需手动创建 Row 和 Schema。
- 启用其它 DataFrame 操作:通过隐式转换,我们可以使用一些方便的 DataFrame 操作,例如 select、filter、groupBy 等,而无需显示地导入其他相关的类。
object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") //创建 SparkSession 对象 val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() // 导入隐式转换,以便可以使用toDF方法将Seq转换为DataFrame import spark.implicits._ // 使用集合创建DataFrame val data = Seq(("Alice", 30), ("Bob", 25), ("Charlie", 22)) val columns = Seq("Name", "Age") // 将集合转换为DataFrame val df = data.toDF(columns: _*) // 显示DataFrame内容 df.show() //关闭 Spark spark.stop() } }
2,运行结果
二、通过 RDD 创建 DataFrame
1,RDD 转换为 DataFrame
(1)下面样例类将 RDD 转换为 DataFrame:
object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") //创建 SparkSession 对象 val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() //RDD=>DataFrame=>DataSet 转换需要引入隐式转换规则,否则无法转换 //spark 不是包名,是上下文环境对象名 import spark.implicits._ // 创建RDD(通过集合) val rdd = spark.sparkContext.makeRDD(Array( (1, "Alice"), (2, "Bob"), (3, "Charlie"), (4, "David"), (5, "Eva"))) // 将RDD转换为DataFrame val df = rdd.toDF("id", "name") // 显示 DataFrame df.show() //关闭 Spark spark.stop() } }
(2)运行结果如下:
2,DataFrame 转换为 RDD
(1)DataFrame 其实就是对 RDD 的封装,所以可以直接获取内部的 RDD,下面样例将 DataFrame 转换为 RDD:
object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") //创建 SparkSession 对象 val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() //RDD=>DataFrame=>DataSet 转换需要引入隐式转换规则,否则无法转换 //spark 不是包名,是上下文环境对象名 import spark.implicits._ // 创建RDD(通过集合) val rdd = spark.sparkContext.makeRDD(Array( (1, "Alice"), (2, "Bob"), (3, "Charlie"), (4, "David"), (5, "Eva"))) // 将RDD转换为DataFrame val df = rdd.toDF("id", "name") // 将DataFrame转换为RDD var rdd2 = df.rdd // 显示RDD val array:Array[org.apache.spark.sql.Row] = rdd2.collect() println(array(0)) println(array(0)(0)) println(array(0).getAs[String]("name")) //关闭 Spark spark.stop() } }
(2)运行结果如下:
三、读取文件数据并创建 DataFrame
1,读取 json 文件创建 DataFrame
(1)假设我们项目中有个 json 文件,内容如下:
{"username":"hangge","age":20} {"username":"xiaoli","age":11} {"username":"hutu","age":33}
(2)下面代码我们将该文件加载到 DataFrame:
// 读取 json 文件创建 DataFrame val df = spark.read.json("datas/user.json") df.show()
(3)我们也可以将 DataFrame 保存为 json 文件:
// 读取 json 文件创建 DataFrame val df = spark.read.json("datas/user.json") df.show() // 将 DataFrame 保存到json文件 df.write.json("output")
2,读取 txt 文件创建 DataFrame
(1)假设我们项目中有个 txt 文件,内容如下:
welcome hangge.com 航歌 china
(2)下面代码我们将该文件加载到 DataFrame,读取到的 DataFrame 只会有一个列,列名默认称之为 value :
// 读取 txt 文件创建 DataFrame val df = spark.read.text("datas/word.txt") df.show()
(3)我们也可以将 DataFrame 保存为 txt 文件:
// 读取 txt 文件创建 DataFrame val df = spark.read.text("datas/word.txt") df.show() // 将 DataFrame 保存到 txt 文件 df.write.text("output")
3,读取 csv 文件创建 DataFrame
(1)假设我们项目中有个 csv 文件,内容如下:
name,age hangge,20 xiaoli,11 hutu,33
(2)下面代码我们将该文件加载到 DataFrame:
// 读取 csv 文件创建 DataFrame val df = spark.read.option("sep", ",") // 列分隔符 .option("inferSchema", "true") .option("header", "true") // 是否有CSV标头 //.schema("name STRING, age INT") // 指定列名和类型 .csv("datas/user.csv") df.show()
(3)我们也可以将 DataFrame 保存为 csv 文件:
// 读取 csv 文件创建 DataFrame val df = spark.read.option("sep", ",") // 列分隔符 .option("inferSchema", "true") .option("header", "true") // 是否有CSV标头 //.schema("name STRING, age INT") // 指定列名和类型 .csv("datas/user.csv") df.show() // 将 DataFrame 保存到 csv 文件 df.write.csv("output")
4,读取 Parquet 文件创建 DataFrame
(1)Spark SQL 的默认数据源为 Parquet 格式。Parquet 是一种能够有效存储嵌套数据的列式存储格式。下面代码我们将 Parquet 文件加载到 DataFrame:
parquet 对比普通的文本文件的区别:
- parquet 内置 schema(列名 \ 列类型 \ 是否为空)。
- 存储是以列作为存储格式。
- 存储是序列化存储在文件中的(有压缩属性体积小)。
// 读取 parquet 文件创建 DataFrame val df = spark.read.parquet("datas/user.parquet") df.show()
(2)下面代码将 DataFrame 写入到 Parquet 文件中:
// 将 DataFrame 保存到 parquet 文件 df.write.parquet("output")
(3)Parquet 文件不能直接打开查看,如果想要查看内容,可以“Avro and Parquet View”这个插件来查看,安装后 IDEA 底部会有个“Avro/Parquet View”按钮,点击后将 Parquet 文件拖入到打开的窗口即可进行查看:
四、读取数据库数据并创建 DataFrame
1,准备工作
(1)Spark SQL 可以通过 JDBC 从关系型数据库中读取数据的方式创建 DataFrame,通过对 DataFrame 一系列的计算后,还可以将数据再写回关系型数据库中。假设 MySQL 数据库中有张 user_info 表,表中有如下数据:
(2)首先我们需要编辑项目的 pom.xml 文件,添加数据库驱动依赖:
<!-- 数据库驱动依赖 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.33</version> </dependency>
2,读取数据
(1)下面样例中我们使用了 3 种方式,从数据库中读取表数据并创建 DataFrame:
// 方式1:通用的 load 方法读取 val df1 = spark.read.format("jdbc") .option("url", "jdbc:mysql://192.168.43.96:3306/hangge") .option("driver", "com.mysql.jdbc.Driver") .option("user", "root") .option("password", "hangge1234") .option("dbtable", "user_info") .load() // 显示DataFrame内容 df1.show() // 方式2:通用的 load 方法读取(另一种形式参数) val df2 = spark.read.format("jdbc") .options(Map( "url"->"jdbc:mysql://192.168.43.96:3306/hangge?user=root&password=hangge1234", "dbtable"->"user_info", "driver"->"com.mysql.jdbc.Driver")) .load() // 显示DataFrame内容 df2.show() // 方式3:使用 jdbc 方法读取 val props: Properties = new Properties() props.setProperty("user", "root") props.setProperty("password", "hangge1234") val df3 = spark.read.jdbc("jdbc:mysql://192.168.43.96:3306/hangge", "user_info", props) // 显示DataFrame内容 df3.show()
(2)运行结果如下:
3,写入数据
(1)下面样例中我们使用了 2 种方式将 DataFrame 数据写入到数据库表中:
// 读取原始数据,创建DataFrame val data1 = Seq((3, "小C", "c0c0", 33)) val data2 = Seq((4, "小D", "d0d0", 3)) val columns = Seq("id", "user_name", "pass_word", "age") val df1 = data1.toDF(columns: _*) val df2 = data2.toDF(columns: _*) //方式 1:通用的方式 format 指定写出类型 df1.write .format("jdbc") .option("url", "jdbc:mysql://192.168.43.96:3306/hangge") .option("user", "root") .option("password", "hangge1234") .option("dbtable", "user_info") .mode(SaveMode.Append) .save() //方式 2:通过 jdbc 方法 val props: Properties = new Properties() props.setProperty("user", "root") props.setProperty("password", "hangge1234") df2.write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.43.96:3306/hangge", "user_info", props)
(2)执行后查看数据库可以看到数据已经成功插入到表中:
五、通过 Hive 创建 DataFrame
1,对 Hive 的支持
Spark 支持两种方式来使用 Hive:内置的 Hive 和外部的 Hive。这两种方式在数据处理和元数据存储方面有所不同。
- 内置的 Hive:指的是在 Spark 应用程序中直接使用内置的 Hive Metastore 来存储表的元数据信息,而不依赖外部的 Hive 服务。该方式时候日常的开发学习。
- 外部的 Hive:指的是 Spark 连接到一个独立的外部 Hive Metastore 服务,该服务可能是在独立的 Hive 服务器中或是与 Hadoop 集群中的 Hive Metastore 共享。在这种方式下,Spark 只使用外部的 Hive Metastore 来存储表的元数据信息,数据处理过程与内置的 Hive 没有区别。该方式适用于生产环境。
2,使用样例
(1)下面样例演示如何将 DataFrame 内容写入到 Hive 表中,并从 Hive 中读取数据创建 DataFrame:
object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") //创建 SparkSession 对象 val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() // 导入隐式转换 import spark.implicits._ // 读取原始数据,创建DataFrame val data = Seq((1, "老A", 30), (2, "大B", 25), (3, "小C", 22)) val columns = Seq("id", "name", "age") val df = data.toDF(columns: _*) // 查看目前所有的Hive表 spark.sql("show tables").show() // 将DataFrame内容写入到Hive表中 df.write.saveAsTable("user_info") // 查看目前所有的Hive表 spark.sql("show tables").show() // 从Hive中读取数据创建DataFrame val hiveDF = spark.table("user_info") // 显示DataFrame内容 hiveDF.show() //关闭 Spark spark.stop() } }
(2)执行后控制台输出结果如下:
(3)由于我使用的是内置的 Hive,可以看到项目根目录下会出现一个 spark-warehouse 文件夹,该文件夹下的的每个子文件夹是一张表,子文件夹名为表名,里面有 parquet 文件是表的数据文件。
附:SaveMode 介绍与使用
(1)Spark SQL 对于 save 操作,提供了不同的 save mode。 主要用来控制当目标位置已经有数据时应该如何处理。具体模式如下:
- SaveMode.ErrorIfExists(默认):如果目标位置已经存在数据,那么抛出一个异常
- SaveMode.Append:如果目标位置已经存在数据,那么将数据追加进去
- SaveMode.Overwrite:如果目标位置已经存在数据,那么就将已经存在的数据删除,用新数据进行覆盖
- SaveMode.Ignore:如果目标位置已经存在数据,那么就忽略,不做任何操作
注意:save 操作不会执行锁操作,并且也不是原子的,因此是有一定风险出现脏数据的。
(2)下面样例保存数据时,我们将 SaveMode 设置为 Overwrite,如果目标已存在,则覆盖。
import org.apache.spark.SparkConf import org.apache.spark.sql.{SaveMode, SparkSession} object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") //创建 SparkSession 对象 val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() // 导入隐式转换,以便可以使用toDF方法将Seq转换为DataFrame import spark.implicits._ // 使用集合创建DataFrame val data = Seq(("Alice", 30), ("Bob", 25), ("Charlie", 22)) val columns = Seq("Name", "Age") // 将集合转换为DataFrame val df = data.toDF(columns: _*) // 保持数据到文件 df.write .format("csv") .mode(SaveMode.Overwrite) // 覆盖 .save("hdfs://node1:9000/out-save001") //关闭 Spark spark.stop() } }
全部评论(0)