返回 导航

Spark

hangge.com

Spark - 使用SparkSQL将数据写入Hive表详解2(使用saveAsTable()方法)

作者:hangge | 2024-07-18 08:36
    在之前的文章中,我介绍了如何在 SparkSQL 中集成 Hive 并查询 Hive 表中的数据(点击查看)。实际工作中,我们不仅需要查询数据并计算结果,还希望将结果数据写入 Hive 表中。通常来说,向 Hive 表中写入数据有如下 3 种方法。
  • 第一种:使用 inserInto() 方法。
  • 第二种:使用 saveAsTable() 方法。
  • 第三种:使用 SparkSQL 语句。
    其中第二种不推荐使用,最常用的是第三种,用起来比较方便。我在前文介绍了第一种方法,本文接着介绍第二种使用 saveAsTable() 方法。

二、使用 saveAsTable() 方法写入 Hive 表数据

1,准备工作

(1)首先我们在 hive 中创建一张 student 表:
create table student (
    id int,
    stu_name string,
    stu_birthday date,
    online boolean
)
row format delimited
fields terminated by '\t'
lines terminated by '\n';

(2)然后给表中添加一些初始数据:
load data local inpath '/usr/local/student.txt' into table student;

2,编写代码

(1)首先我们的项目要做好 Hive 的集成配置,具体可以参考我之前写的文章:

(2)接着我们编写如下测试代码,用于通过 SparkSQL Hive student 中查询数据并将其写入到另一个 Hive student_bak。不同于使用 inserInto() 方法时要求写入的 Hive 表是要已经存在的,而 saveAsTable() 方法则不需要,这里又分两种情况:
  • 表不存在
    • 则会根据 DataFrame 中的 Schema 自动创建目标表并写入数据
  • 表存在
    • 如果 mode=append,当 DataFrame 中的 Schema 和表中的 Schema 相同(字段顺序可以不同),则执行追加操作。当 DataFrame 中的 Schema 和表中的 Schema 不相同,则报错。
    • 如果 mode=overwrite,当 DataFrame 中的 Schema 和表中的 Schema 相同(字段顺序可以不同),则直接覆盖。当 DataFrame 中的 Schema 和表中的 Schema 不相同,则会删除之前的表,然后按照 DataFrame 中的 Schema 重新创建表并写入数据。
注意:如果使用 idea 本地运行时需要通过 spark.sql.warehouse.dir 指定一下 hive 对应的 hdfs 路径全称(如果在服务器上运行则不需要该设置),否则获取到的值是 file:/user/hive/warehouse,从而造成查询目标表数据时返回的结果为空。
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object SparkSQLWriteHive_2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")

    //获取SparkSession,为了操作SparkSQL
    val sparkSession = SparkSession
      .builder()
      .appName("SparkSQLWriteHive_2")
      .config(conf)
      //idea本地运行时需要通过spark.sql.warehouse.dir指定一下hive对应的hdfs路径全称
      .config("spark.sql.warehouse.dir", "hdfs://node1:9000/user/hive/warehouse")
      //开启对Hive的支持,支持连接Hive的MetaStore、Hive的序列化、Hive的自定义函数
      .enableHiveSupport()
      .getOrCreate()

    import sparkSession.sql
    //查询数据
    val resDf = sql("select * from student")

    //写入数据
    resDf.write
      //指定数据写入格式append:追加。overwrite:覆盖。
      .mode("overwrite")
      //这里需要指定数据格式: parquet, orc, avro(需要添加外部依赖包), json, csv, text。
      //不指定的话默认是parquet格式。
      //注意:text数据格式在这里不支持int数据类型
      //针对普通文本文件数据格式(json、csv),默认创建的Hive表是SequenceFile格式的,
      //无法读取生成的普通文件,不要使用这种方式。
      //parquet, orc数据格式可以正常使用
      .format("parquet")
      .saveAsTable("student_bak")

    sparkSession.stop()
  }
}

3,运行测试

(1)上面代码运行后,我们可以到到 Hive 中查看目标表的详细信息:
show create table student_bak;

(2)查看这个表的数据也是都有的:
select * from student_bak;

附:使用 saveAsTable() 方法的注意事项

1,text 数据格式不支持 int 数据类型

(1)我们这里验证一下,首先到 hive 中删除表同时删除对应的 hdfs 目录
drop table student_bak;

(2)然后修改代码,将 format 中的值改为 text
//写入数据(表不存在)
resDf.write
  //指定数据写入格式append:追加。overwrite:覆盖。
  .mode("overwrite")
  //这里需要指定数据格式: parquet, orc, avro(需要添加外部依赖包), json, csv, text。
  //不指定的话默认是parquet格式。
  //注意:text数据格式在这里不支持int数据类型
  //针对普通文本文件数据格式(json、csv),默认创建的Hive表是SequenceFile格式的,
  //无法读取生成的普通文件,不要使用这种方式。
  //parquet, orc数据格式可以正常使用
  .format("text")
  .saveAsTable("student_bak")

(3)重新执行程序,发现程序报错,提示说 Text 格式不支持 int 数据类型。

2,普通文本文件数据格式(json、csv)无法正常使用

(1)针对普通文本文件数据格式(jsoncsv),默认创建的 Hive 表是 SequenceFile 格式的,无法读取生成的普通文件,也无法正常使用。我们这里验证一下,首先到 hive 中删除表同时删除对应的 hdfs 目录
drop table student_bak;

(2)修改代码,将 format 中的值改为 json
//写入数据(表不存在)
resDf.write
  //指定数据写入格式append:追加。overwrite:覆盖。
  .mode("overwrite")
  //这里需要指定数据格式: parquet, orc, avro(需要添加外部依赖包), json, csv, text。
  //不指定的话默认是parquet格式。
  //注意:text数据格式在这里不支持int数据类型
  //针对普通文本文件数据格式(json、csv),默认创建的Hive表是SequenceFile格式的,
  //无法读取生成的普通文件,不要使用这种方式。
  //parquet, orc数据格式可以正常使用
  .format("json")
  .saveAsTable("student_bak")

(3)执行程序。程序虽然不报错,但是表中的数据是无法查询的。首先我们先看表结构:
show create table student_bak;

(4)查询表中的数据,发现无法查询。这是因为产生的文件是 json 文件,不是一个 SequenceFile 文件,所以无法读取。使用 csv 格式和 json 格式都会遇到同样的问题。
select * from student_bak;

3,针对已存在的表,当 mode 为 append 时,format 必须指定为 hive

(1)针对已存在的表,当 mode append 时,format 里面必须指定为 hive,否则会报错。我们这里验证一下,首先到 hive 中删除表同时删除对应的 hdfs 目录。
drop table student_bak;

(2)然后编写如下代码:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object SparkSQLWriteHive_2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")

    //获取SparkSession,为了操作SparkSQL
    val sparkSession = SparkSession
      .builder()
      .appName("SparkSQLWriteHive_2")
      .config(conf)
      //idea本地运行时需要通过spark.sql.warehouse.dir指定一下hive对应的hdfs路径全称
      .config("spark.sql.warehouse.dir", "hdfs://node1:9000/user/hive/warehouse")
      //开启对Hive的支持,支持连接Hive的MetaStore、Hive的序列化、Hive的自定义函数
      .enableHiveSupport()
      .getOrCreate()

    import sparkSession.sql
    //查询数据
    val resDf = sql("select * from student")

    //创建目标表
    sql(
      """
        |create table if not exists student_bak(
        | id int,
        | stu_name string,
        | stu_birthday date,
        | online boolean
        |)using hive
        | OPTIONS(
        |   fileFormat 'textfile',
        |   fieldDelim '\t'
        |   )
        |""".stripMargin)

    //写入数据(表存在 )
    resDf.write
      //指定数据写入格式append:追加。overwrite:覆盖。
      .mode("append")
      //这里需要指定数据格式: parquet, orc, avro(需要添加外部依赖包), json, csv, text。
      //不指定的话默认是parquet格式。
      //注意:text数据格式在这里不支持int数据类型
      //针对普通文本文件数据格式(json、csv),默认创建的Hive表是SequenceFile格式的,
      //无法读取生成的普通文件,不要使用这种方式。
      //parquet, orc数据格式可以正常使用
      .format("text")
      .saveAsTable("student_bak")

    sparkSession.stop()
  }
}

(3)执行程序,发现执行报错,提示这个表是 HiveFileFormat,不支持 TextDataSourceV2 格式。
提示:要解决这个问题只需将 format 中的参数置指定为 hive 即可。
评论

全部评论(0)

回到顶部