返回 导航

Spark

hangge.com

Spark - SparkSQL使用详解1(DataFrame的创建:从RDD、文件、数据库、Hive)

作者:hangge | 2023-11-17 09:11

一、使用集合或数组直接创建 DataFrame

1,使用样例

下面代码我们定义了一个包含数据的集合和一个包含列名的数组。最后,我们通过调用 toDF 方法将集合转换为 DataFrame
import spark.implicits._ 作用是引入 SparkSession 中的隐式转换,具体功能如下:
  • 启用隐式转换SparkSession 对象 spark 是一个特殊的 Spark 入口点,通过导入 spark.implicits._,我们可以将 SparkSession 对象的隐式转换启用。这意味着 Spark 会自动将一些常见的数据类型(如 RDDSeq 等)转换为 SparkDataFrameDataset,从而使得对数据的操作更加简洁和方便。
  • 提供 toDF 方法:隐式转换使得集合对象(例如 SeqList)可以使用 toDF 方法,直接将其转换为 DataFrame。这样,我们就可以直接从集合创建 DataFrame,而无需手动创建 RowSchema
  • 启用其它 DataFrame 操作:通过隐式转换,我们可以使用一些方便的 DataFrame 操作,例如 selectfiltergroupBy 等,而无需显示地导入其他相关的类。
object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
    //创建 SparkSession 对象
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    // 导入隐式转换,以便可以使用toDF方法将Seq转换为DataFrame
    import spark.implicits._

    // 使用集合创建DataFrame
    val data = Seq(("Alice", 30), ("Bob", 25), ("Charlie", 22))
    val columns = Seq("Name", "Age")

    // 将集合转换为DataFrame
    val df = data.toDF(columns: _*)

    // 显示DataFrame内容
    df.show()

    //关闭 Spark
    spark.stop()
  }
}

2,运行结果


二、通过 RDD 创建 DataFrame

1,RDD 转换为 DataFrame

(1)下面样例类将 RDD 转换为 DataFrame
object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
    //创建 SparkSession 对象
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    //RDD=>DataFrame=>DataSet 转换需要引入隐式转换规则,否则无法转换
    //spark 不是包名,是上下文环境对象名
    import spark.implicits._

    // 创建RDD(通过集合)
    val rdd = spark.sparkContext.makeRDD(Array(
      (1, "Alice"),
      (2, "Bob"),
      (3, "Charlie"),
      (4, "David"),
      (5, "Eva")))

    // 将RDD转换为DataFrame
    val df = rdd.toDF("id", "name")

    // 显示 DataFrame
    df.show()

    //关闭 Spark
    spark.stop()
  }
}
(2)运行结果如下:

2,DataFrame 转换为 RDD

(1)DataFrame 其实就是对 RDD 的封装,所以可以直接获取内部的 RDD,下面样例将 DataFrame 转换为 RDD
object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
    //创建 SparkSession 对象
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    //RDD=>DataFrame=>DataSet 转换需要引入隐式转换规则,否则无法转换
    //spark 不是包名,是上下文环境对象名
    import spark.implicits._

    // 创建RDD(通过集合)
    val rdd = spark.sparkContext.makeRDD(Array(
      (1, "Alice"),
      (2, "Bob"),
      (3, "Charlie"),
      (4, "David"),
      (5, "Eva")))

    // 将RDD转换为DataFrame
    val df = rdd.toDF("id", "name")

    // 将DataFrame转换为RDD
    var rdd2 = df.rdd

    // 显示RDD
    val array:Array[org.apache.spark.sql.Row] = rdd2.collect()
    println(array(0))
    println(array(0)(0))
    println(array(0).getAs[String]("name"))

    //关闭 Spark
    spark.stop()
  }
}
(2)运行结果如下:

三、读取文件数据并创建 DataFrame

1,读取 json 文件创建 DataFrame

(1)假设我们项目中有个 json 文件,内容如下:
{"username":"hangge","age":20}
{"username":"xiaoli","age":11}
{"username":"hutu","age":33}

(2)下面代码我们将该文件加载到 DataFrame
// 读取 json 文件创建 DataFrame
val df = spark.read.json("datas/user.json")
df.show()

(3)我们也可以将 DataFrame 保存为 json 文件:
// 读取 json 文件创建 DataFrame
val df = spark.read.json("datas/user.json")
df.show()
// 将 DataFrame 保存到json文件
df.write.json("output")

2,读取 txt 文件创建 DataFrame

(1)假设我们项目中有个 txt 文件,内容如下:
welcome hangge.com
航歌
china

(2)下面代码我们将该文件加载到 DataFrame,读取到的 DataFrame 只会有一个列,列名默认称之为 value 
// 读取 txt 文件创建 DataFrame
val df = spark.read.text("datas/word.txt")
df.show()

(3)我们也可以将 DataFrame 保存为 txt 文件:
// 读取 txt 文件创建 DataFrame
val df = spark.read.text("datas/word.txt")
df.show()
// 将 DataFrame 保存到 txt 文件
df.write.text("output")

3,读取 csv 文件创建 DataFrame

(1)假设我们项目中有个 csv 文件,内容如下:
name,age
hangge,20
xiaoli,11
hutu,33

(2)下面代码我们将该文件加载到 DataFrame
// 读取 csv 文件创建 DataFrame
val df = spark.read.option("sep", ",") // 列分隔符
  .option("inferSchema", "true")
  .option("header", "true") // 是否有CSV标头
  //.schema("name STRING, age INT") // 指定列名和类型
  .csv("datas/user.csv")
df.show()

(3)我们也可以将 DataFrame 保存为 csv 文件:
// 读取 csv 文件创建 DataFrame
val df = spark.read.option("sep", ",") // 列分隔符
  .option("inferSchema", "true")
  .option("header", "true") // 是否有CSV标头
  //.schema("name STRING, age INT") // 指定列名和类型
  .csv("datas/user.csv")
df.show()
// 将 DataFrame 保存到 csv 文件
df.write.csv("output")

4,读取 Parquet 文件创建 DataFrame

(1)Spark SQL 的默认数据源为 Parquet 格式。Parquet 是一种能够有效存储嵌套数据的列式存储格式。下面代码我们将 Parquet 文件加载到 DataFrame
parquet 对比普通的文本文件的区别:
  • parquet 内置 schema(列名 \ 列类型 \ 是否为空)。
  • 存储是以列作为存储格式。
  • 存储是序列化存储在文件中的(有压缩属性体积小)。
// 读取 parquet 文件创建 DataFrame
val df = spark.read.parquet("datas/user.parquet")
df.show()

(2)下面代码将 DataFrame 写入到 Parquet 文件中:
// 将 DataFrame 保存到 parquet 文件
df.write.parquet("output")

(3)Parquet 文件不能直接打开查看,如果想要查看内容,可以“Avro and Parquet View”这个插件来查看,安装后 IDEA 底部会有个“Avro/Parquet View”按钮,点击后将 Parquet 文件拖入到打开的窗口即可进行查看: 
         

四、读取数据库数据并创建 DataFrame

1,准备工作

(1)Spark SQL 可以通过 JDBC 从关系型数据库中读取数据的方式创建 DataFrame,通过对 DataFrame 一系列的计算后,还可以将数据再写回关系型数据库中。假设 MySQL 数据库中有张 user_info 表,表中有如下数据:

(2)首先我们需要编辑项目的 pom.xml 文件,添加数据库驱动依赖:
<!-- 数据库驱动依赖 -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.33</version>
</dependency>

2,读取数据

(1)下面样例中我们使用了 3 种方式,从数据库中读取表数据并创建 DataFrame
// 方式1:通用的 load 方法读取
val df1 = spark.read.format("jdbc")
  .option("url", "jdbc:mysql://192.168.43.96:3306/hangge")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("user", "root")
  .option("password", "hangge1234")
  .option("dbtable", "user_info")
  .load()
// 显示DataFrame内容
df1.show()

// 方式2:通用的 load 方法读取(另一种形式参数)
val df2 = spark.read.format("jdbc")
  .options(Map(
    "url"->"jdbc:mysql://192.168.43.96:3306/hangge?user=root&password=hangge1234",
    "dbtable"->"user_info",
    "driver"->"com.mysql.jdbc.Driver"))
  .load()
// 显示DataFrame内容
df2.show()

// 方式3:使用 jdbc 方法读取
val props: Properties = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "hangge1234")
val df3 = spark.read.jdbc("jdbc:mysql://192.168.43.96:3306/hangge", "user_info", props)
// 显示DataFrame内容
df3.show()

(2)运行结果如下:

3,写入数据

(1)下面样例中我们使用了 2 种方式将 DataFrame 数据写入到数据库表中:
// 读取原始数据,创建DataFrame
val data1 = Seq((3, "小C", "c0c0", 33))
val data2 = Seq((4, "小D", "d0d0", 3))
val columns = Seq("id", "user_name", "pass_word", "age")
val df1 = data1.toDF(columns: _*)
val df2 = data2.toDF(columns: _*)

//方式 1:通用的方式 format 指定写出类型
df1.write
  .format("jdbc")
  .option("url", "jdbc:mysql://192.168.43.96:3306/hangge")
  .option("user", "root")
  .option("password", "hangge1234")
  .option("dbtable", "user_info")
  .mode(SaveMode.Append)
  .save()

//方式 2:通过 jdbc 方法
val props: Properties = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "hangge1234")
df2.write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.43.96:3306/hangge", "user_info", props)

(2)执行后查看数据库可以看到数据已经成功插入到表中:

五、通过 Hive 创建 DataFrame

1,对 Hive 的支持

Spark 支持两种方式来使用 Hive:内置的 Hive 和外部的 Hive。这两种方式在数据处理和元数据存储方面有所不同。
  • 内置的 Hive:指的是在 Spark 应用程序中直接使用内置的 Hive Metastore 来存储表的元数据信息,而不依赖外部的 Hive 服务。该方式时候日常的开发学习。
  • 外部的 Hive:指的是 Spark 连接到一个独立的外部 Hive Metastore 服务,该服务可能是在独立的 Hive 服务器中或是与 Hadoop 集群中的 Hive Metastore 共享。在这种方式下,Spark 只使用外部的 Hive Metastore 来存储表的元数据信息,数据处理过程与内置的 Hive 没有区别。该方式适用于生产环境。

2,使用样例

(1)下面样例演示如何将 DataFrame 内容写入到 Hive 表中,并从 Hive 中读取数据创建 DataFrame
object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
    //创建 SparkSession 对象
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 读取原始数据,创建DataFrame
    val data = Seq((1, "老A", 30), (2, "大B", 25), (3, "小C", 22))
    val columns = Seq("id", "name", "age")
    val df = data.toDF(columns: _*)

    // 查看目前所有的Hive表
    spark.sql("show tables").show()

    // 将DataFrame内容写入到Hive表中
    df.write.saveAsTable("user_info")

    // 查看目前所有的Hive表
    spark.sql("show tables").show()

    // 从Hive中读取数据创建DataFrame
    val hiveDF = spark.table("user_info")
    // 显示DataFrame内容
    hiveDF.show()

    //关闭 Spark
    spark.stop()
  }
}

(2)执行后控制台输出结果如下:

(3)由于我使用的是内置的 Hive,可以看到项目根目录下会出现一个 spark-warehouse 文件夹,该文件夹下的的每个子文件夹是一张表,子文件夹名为表名,里面有 parquet 文件是表的数据文件。

附:SaveMode 介绍与使用

(1)Spark SQL 对于 save 操作,提供了不同的 save mode。 主要用来控制当目标位置已经有数据时应该如何处理。具体模式如下:
  • SaveMode.ErrorIfExists(默认):如果目标位置已经存在数据,那么抛出一个异常
  • SaveMode.Append:如果目标位置已经存在数据,那么将数据追加进去
  • SaveMode.Overwrite:如果目标位置已经存在数据,那么就将已经存在的数据删除,用新数据进行覆盖
  • SaveMode.Ignore:如果目标位置已经存在数据,那么就忽略,不做任何操作
注意save 操作不会执行锁操作,并且也不是原子的,因此是有一定风险出现脏数据的。

(2)下面样例保存数据时,我们将 SaveMode 设置为 Overwrite,如果目标已存在,则覆盖。
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
    //创建 SparkSession 对象
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    // 导入隐式转换,以便可以使用toDF方法将Seq转换为DataFrame
    import spark.implicits._

    // 使用集合创建DataFrame
    val data = Seq(("Alice", 30), ("Bob", 25), ("Charlie", 22))
    val columns = Seq("Name", "Age")

    // 将集合转换为DataFrame
    val df = data.toDF(columns: _*)

    // 保持数据到文件
    df.write
      .format("csv")
      .mode(SaveMode.Overwrite) // 覆盖
      .save("hdfs://node1:9000/out-save001")

    //关闭 Spark
    spark.stop()
  }
}
评论

全部评论(0)

回到顶部