返回 导航

Spark

hangge.com

Spark - SparkSQL使用详解2(Dataset的创建:从RDD、DataFrame、数据库、Hive)

作者:hangge | 2023-11-20 08:54

一、使用集合或数组直接创建 DataSet

1,使用样例

下面代码我们定义了一个包含数据的集合和一个包含列名的数组。最后,我们通过调用 toDS 方法将集合转换为 DataFrame
import spark.implicits._ 作用是引入 SparkSession 中的隐式转换,具体功能如下:
  • 启用隐式转换SparkSession 对象 spark 是一个特殊的 Spark 入口点,通过导入 spark.implicits._,我们可以将 SparkSession 对象的隐式转换启用。这意味着 Spark 会自动将一些常见的数据类型(如 RDDSeq 等)转换为 SparkDataFrameDataset,从而使得对数据的操作更加简洁和方便。
  • 提供 toDS 方法:隐式转换使得集合对象(例如 SeqList)可以使用 toDS 方法,直接将其转换为 Dataset。这样就不需要手动定义 RowSchemaSpark 会自动根据 RDD 的数据类型进行转换。
  • 导入 spark.implicits._ 后,可以直接使用 Dataset 的操作,如 mapfiltergroupBy 等,而不需要显式导入其他相关的类。
// 定义样例类
case class Person(id:Int, name: String, age: Int)

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
    //创建 SparkSession 对象
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 读取原始数据,创建DataSet
    val data = Seq(Person(1, "老A", 30), Person(2, "大B", 25), Person(3, "小C", 22))
    val ds = data.toDS()

    // 显示DataSet内容
    ds.show()

    //关闭 Spark
    spark.stop()
  }
}

2,运行结果


二、通过 RDD 创建 Dataset

1,RDD 转换为 Dataset

(1)下面样例使用 toDS 方法将一个已有的 RDD 转换为 DataFrame,我们需要定义一个 case class 来表示数据的结构:
// 定义样例类
case class Person(id:Int, name: String, age: Int)

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
    //创建 SparkSession 对象
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 创建RDD(通过集合)
    val rdd = spark.sparkContext.makeRDD(Array(
      (1, "老A", 30),
      (2, "大B", 25),
      (3, "小C", 22)))

    // 将RDD转换为Dataset
    val ds: Dataset[Person] = rdd.map {
      case (id, name, age) => Person(id, name, age)
    }.toDS()

    // 显示Dataset内容
    ds.show()

    //关闭 Spark
    spark.stop()
  }
}
(2)运行结果如下:

2,Dataset 转换为 RDD

(1)可以使用 rdd 方法将 Dataset 转换为 RDD
// 定义样例类
case class Person(id:Int, name: String, age: Int)

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
    //创建 SparkSession 对象
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 创建RDD(通过集合)
    val rdd = spark.sparkContext.makeRDD(Array(
      (1, "老A", 30),
      (2, "大B", 25),
      (3, "小C", 22)))

    // 将RDD转换为Dataset
    val ds: Dataset[Person] = rdd.map {
      case (id, name, age) => Person(id, name, age)
    }.toDS()

    // 将Dataset转换为RDD
    var rdd2: RDD[Person] = ds.rdd

    // 显示RDD
    val array:Array[Person] = rdd2.collect()
    array.foreach(println)

    //关闭 Spark
    spark.stop()
  }
}
(2)运行结果如下:

三、通过 DataFrame 创建 Dataset

1,DataFrame 转换为 Dataset

(1)我们可以使用 as 方法将一个已有的 DataFrame 转换为 Dataset
// 定义样例类
case class Person(id:Int, name: String, age: Int)

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
    //创建 SparkSession 对象
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 使用集合创建DataFrame
    val data = Seq((1, "老A", 30), (2, "大B", 25), (3, "小C", 22))
    val columns = Seq("id", "name", "age")

    // 将集合转换为DataFrame
    val df = data.toDF(columns: _*)

    // 将DataFrame转换为Dataset
    val ds: Dataset[Person] = df.as[Person]

    // 显示Dataset内容
    ds.show()

    //关闭 Spark
    spark.stop()
  }
}

(2)运行结果如下:

2,Dataset 转换为 DataFrame

(1)要将 Dataset 转换为 DataFrame,可以直接使用 toDF 方法,它会将 Dataset 转换为 DataFrame,并且保留数据的结构和类型信息:
// 定义样例类
case class Person(id:Int, name: String, age: Int)

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
    //创建 SparkSession 对象
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 读取原始数据,创建DataSet
    val data = Seq(Person(1, "老A", 30), Person(2, "大B", 25), Person(3, "小C", 22))
    val ds = data.toDS()

    // 将Dataset转换为DataFrame
    val df = ds.toDF()

    // 显示DataFrame内容
    df.show()
    
    //关闭 Spark
    spark.stop()
  }
}

(2)运行结果如下:

四、读取文件数据并创建 Dataset

1,读取 json 文件创建 Dataset

(1)假设我们项目中有个 json 文件,内容如下:
{"username":"hangge","age":20}
{"username":"xiaoli","age":11}
{"username":"hutu","age":33}

(2)下面代码我们将该文件加载到 DataFrame,然后通过 as 方法转换成 Dataset
// 定义样例类
case class Person(username: String, age: BigInt)

// 读取 json 文件创建 Dataset
val ds = spark.read.json("datas/user.json").as[Person]
ds.show()

(3)我们也可以将 DataFrame 保存为 json 文件:
// 读取 json 文件创建 Dataset
val ds = spark.read.json("datas/user.json").as[Person]
ds.show()
// 将 Dataset 保存到json文件
ds.write.json("output")

2,读取 txt 文件创建 DataFrame

(1)假设我们项目中有个 txt 文件,内容如下:
welcome hangge.com
航歌
china

(2)下面代码我们将该文件加载到 Dataset,读取到的 Dataset 只会有一个列,列名默认为 value
// 读取 txt 文件创建 Dataset
val ds = spark.read.textFile("datas/word.txt")
ds.show()

(3)我们也可以将 Dataset 保存为 txt 文件:
// 读取 txt 文件创建 Dataset
val ds = spark.read.textFile("datas/word.txt")
ds.show()
// 将 Dataset 保存到 txt 文件
ds.write.text("output")

3,读取 csv 文件创建 Dataset

(1)假设我们项目中有个 csv 文件,内容如下:
name,age
hangge,20
xiaoli,11
hutu,33

(2)下面代码我们将该文件加载到 Dataset
// 定义样例类
case class Person(name: String, age: BigInt)

// 读取 csv 文件创建 Dataset
val ds = spark.read.option("sep", ",") // 列分隔符
  .option("inferSchema", "true")
  .option("header", "true") // 是否有CSV标头
  //.schema("name STRING, age INT") // 指定列名和类型
  .csv("datas/user.csv")
  .as[Person]
ds.show()

(3)我们也可以将 Dataset 保存为 csv 文件:
// 读取 csv 文件创建 Dataset
val ds = spark.read.option("sep", ",") // 列分隔符
  .option("inferSchema", "true")
  .option("header", "true") // 是否有CSV标头
  //.schema("name STRING, age INT") // 指定列名和类型
  .csv("datas/user.csv")
  .as[Person]
ds.show()
// 将 Dataset 保存到 csv 文件
ds.write.csv("output")

4,读取 Parquet 文件创建 Dataset

(1)Spark SQL 的默认数据源为 Parquet 格式。Parquet 是一种能够有效存储嵌套数据的列式存储格式。下面代码我们将 Parquet 文件加载到 Dataset
parquet 对比普通的文本文件的区别:
  • parquet 内置 schema(列名\ 列类型\ 是否为空)。
  • 存储是以列作为存储格式。
  • 存储是序列化存储在文件中的(有压缩属性体积小)。
// 读取 parquet 文件创建 Dataset
val ds = spark.read.parquet("datas/user.parquet").as[Person]
ds.show()

(2)下面代码将 Dataset 写入到 Parquet 文件中:
// 将 Dataset 保存到 parquet 文件
ds.write.parquet("output")

(3)Parquet 文件不能直接打开查看,如果想要查看内容,可以“Avro and Parquet View”这个插件来查看,安装后 IDEA 底部会有个“Avro/Parquet View”按钮,点击后将 Parquet 文件拖入到打开的窗口即可进行查看: 
         

四、读取数据库数据并创建 Dataset

1,准备工作

(1)Spark SQL 可以通过 JDBC 从关系型数据库中读取数据的方式创建 DataFrame 然后转换成 Dataset,通过对 Dataset 一系列的计算后,还可以将数据再写回关系型数据库中。假设 MySQL 数据库中有张 user_info 表,表中有如下数据:

(2)首先我们需要编辑项目的 pom.xml 文件,添加数据库驱动依赖:
<!-- 数据库驱动依赖 -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.33</version>
</dependency>

2,读取数据

(1)下面样例中我们使用了 3 种方式,从数据库中读取表数据并创建 Dataset
// 定义样例类
case class Person(id: Int, user_name: String, pass_word: String, age: Int)

// 方式1:通用的 load 方法读取
val ds1 = spark.read.format("jdbc")
  .option("url", "jdbc:mysql://192.168.43.96:3306/hangge")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("user", "root")
  .option("password", "hangge1234")
  .option("dbtable", "user_info")
  .load()
  .as[Person]
// 显示Dataset内容
ds1.show()

// 方式2:通用的 load 方法读取(另一种形式参数)
val ds2 = spark.read.format("jdbc")
  .options(Map(
    "url"->"jdbc:mysql://192.168.43.96:3306/hangge?user=root&password=hangge1234",
    "dbtable"->"user_info",
    "driver"->"com.mysql.jdbc.Driver"))
  .load()
  .as[Person]
// 显示Dataset内容
ds2.show()

// 方式3:使用 jdbc 方法读取
val props: Properties = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "hangge1234")
val ds3 = spark.read.jdbc("jdbc:mysql://192.168.43.96:3306/hangge", "user_info", props).as[Person]
// 显示Dataset内容
ds3.show()

(2)运行结果如下:

3,写入数据

(1)下面样例中我们使用了 2 种方式将 Dataset 数据写入到数据库表中:
// 定义样例类
case class Person(id: Int, user_name: String, pass_word: String, age: Int)

// 读取原始数据,创建DataFrame
val data1 = Seq(Person(3, "小C", "c0c0", 33))
val data2 = Seq(Person(4, "小D", "d0d0", 3))
val ds1 = data1.toDS()
val ds2 = data2.toDS()

//方式 1:通用的方式 format 指定写出类型
ds1.write
  .format("jdbc")
  .option("url", "jdbc:mysql://192.168.43.96:3306/hangge")
  .option("user", "root")
  .option("password", "hangge1234")
  .option("dbtable", "user_info")
  .mode(SaveMode.Append)
  .save()

//方式 2:通过 jdbc 方法
val props: Properties = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "hangge1234")
ds2.write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.43.96:3306/hangge", "user_info", props)

(2)执行后查看数据库可以看到数据已经成功插入到表中:

五、通过 Hive 创建 Dataset

1,对 Hive 的支持

Spark 支持两种方式来使用 Hive:内置的 Hive 和外部的 Hive。这两种方式在数据处理和元数据存储方面有所不同。
  • 内置的 Hive:指的是在 Spark 应用程序中直接使用内置的 Hive Metastore 来存储表的元数据信息,而不依赖外部的 Hive 服务。该方式时候日常的开发学习。
  • 外部的 Hive:指的是 Spark 连接到一个独立的外部 Hive Metastore 服务,该服务可能是在独立的 Hive 服务器中或是与 Hadoop 集群中的 Hive Metastore 共享。在这种方式下,Spark 只使用外部的 Hive Metastore 来存储表的元数据信息,数据处理过程与内置的 Hive 没有区别。该方式适用于生产环境。

1,使用样例

(1)下面样例演示如何将 Dataset 内容写入到 Hive 表中,并从 Hive 中读取数据创建 Dataset
// 定义样例类
case class Person(id:Int, name: String, age: Int)

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
    //创建 SparkSession 对象
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 读取原始数据,创建DataSet
    val data = Seq(Person(1, "老A", 30), Person(2, "大B", 25), Person(3, "小C", 22))
    val ds = data.toDS()

    // 查看目前所有的Hive表
    spark.sql("show tables").show()

    // 将Dataset内容写入到Hive表中
    ds.write.saveAsTable("user_info")

    // 查看目前所有的Hive表
    spark.sql("show tables").show()

    // 从Hive中读取数据创建DataFrame
    val hiveDS: Dataset[Person] = spark.table("user_info").as[Person]
    // 显示Dataset内容
    hiveDS.show()

    //关闭 Spark
    spark.stop()
  }
}

(2)执行后控制台输出结果如下:

(3)由于我使用的是内置的 Hive,可以看到项目根目录下会出现一个 spark-warehouse 文件夹,该文件夹下的的每个子文件夹是一张表,子文件夹名为表名,里面有 parquet 文件是表的数据文件。

附:SaveMode 介绍与使用

(1)Spark SQL 对于 save 操作,提供了不同的 save mode。 主要用来控制当目标位置已经有数据时应该如何处理。具体模式如下:
  • SaveMode.ErrorIfExists(默认):如果目标位置已经存在数据,那么抛出一个异常
  • SaveMode.Append:如果目标位置已经存在数据,那么将数据追加进去
  • SaveMode.Overwrite:如果目标位置已经存在数据,那么就将已经存在的数据删除,用新数据进行覆盖
  • SaveMode.Ignore:如果目标位置已经存在数据,那么就忽略,不做任何操作
注意save 操作不会执行锁操作,并且也不是原子的,因此是有一定风险出现脏数据的。

(2)下面样例保存数据时,我们将 SaveMode 设置为 Overwrite,如果目标已存在,则覆盖。
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}

// 定义样例类
case class Person(id:Int, name: String, age: Int)

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
    //创建 SparkSession 对象
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 读取原始数据,创建DataSet
    val data = Seq(Person(1, "老A", 30), Person(2, "大B", 25), Person(3, "小C", 22))
    val ds = data.toDS()

    // 保持数据到文件
    ds.write
      .format("csv")
      .mode(SaveMode.Overwrite) // 覆盖
      .save("hdfs://node1:9000/out-save001")

    //关闭 Spark
    spark.stop()
  }
}
评论

全部评论(0)

回到顶部