Spark - 使用SparkSQL将数据写入Hive表详解2(使用saveAsTable()方法)
作者:hangge | 2024-07-18 08:36
在之前的文章中,我介绍了如何在 SparkSQL 中集成 Hive 并查询 Hive 表中的数据(点击查看)。实际工作中,我们不仅需要查询数据并计算结果,还希望将结果数据写入 Hive 表中。通常来说,向 Hive 表中写入数据有如下 3 种方法。
(2)然后给表中添加一些初始数据:
(2)查看这个表的数据也是都有的:
(2)然后修改代码,将 format 中的值改为 text:
(3)重新执行程序,发现程序报错,提示说 Text 格式不支持 int 数据类型。
(2)修改代码,将 format 中的值改为 json:
(3)执行程序。程序虽然不报错,但是表中的数据是无法查询的。首先我们先看表结构:
(2)然后编写如下代码:
(3)执行程序,发现执行报错,提示这个表是 HiveFileFormat,不支持 TextDataSourceV2 格式。
- 第一种:使用 inserInto() 方法。
- 第二种:使用 saveAsTable() 方法。
- 第三种:使用 SparkSQL 语句。
其中第二种不推荐使用,最常用的是第三种,用起来比较方便。我在前文介绍了第一种方法,本文接着介绍第二种使用 saveAsTable() 方法。
二、使用 saveAsTable() 方法写入 Hive 表数据
1,准备工作
(1)首先我们在 hive 中创建一张 student 表:
create table student (
id int,
stu_name string,
stu_birthday date,
online boolean
)
row format delimited
fields terminated by '\t'
lines terminated by '\n';
(2)然后给表中添加一些初始数据:
load data local inpath '/usr/local/student.txt' into table student;
2,编写代码
(1)首先我们的项目要做好 Hive 的集成配置,具体可以参考我之前写的文章:
(2)接着我们编写如下测试代码,用于通过 SparkSQL 从 Hive 表 student 中查询数据并将其写入到另一个 Hive 表 student_bak。不同于使用 inserInto() 方法时要求写入的 Hive 表是要已经存在的,而 saveAsTable() 方法则不需要,这里又分两种情况:
- 表不存在:
- 则会根据 DataFrame 中的 Schema 自动创建目标表并写入数据
- 表存在:
- 如果 mode=append,当 DataFrame 中的 Schema 和表中的 Schema 相同(字段顺序可以不同),则执行追加操作。当 DataFrame 中的 Schema 和表中的 Schema 不相同,则报错。
- 如果 mode=overwrite,当 DataFrame 中的 Schema 和表中的 Schema 相同(字段顺序可以不同),则直接覆盖。当 DataFrame 中的 Schema 和表中的 Schema 不相同,则会删除之前的表,然后按照 DataFrame 中的 Schema 重新创建表并写入数据。
注意:如果使用 idea 本地运行时需要通过 spark.sql.warehouse.dir 指定一下 hive 对应的 hdfs 路径全称(如果在服务器上运行则不需要该设置),否则获取到的值是 file:/user/hive/warehouse,从而造成查询目标表数据时返回的结果为空。
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object SparkSQLWriteHive_2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")
//获取SparkSession,为了操作SparkSQL
val sparkSession = SparkSession
.builder()
.appName("SparkSQLWriteHive_2")
.config(conf)
//idea本地运行时需要通过spark.sql.warehouse.dir指定一下hive对应的hdfs路径全称
.config("spark.sql.warehouse.dir", "hdfs://node1:9000/user/hive/warehouse")
//开启对Hive的支持,支持连接Hive的MetaStore、Hive的序列化、Hive的自定义函数
.enableHiveSupport()
.getOrCreate()
import sparkSession.sql
//查询数据
val resDf = sql("select * from student")
//写入数据
resDf.write
//指定数据写入格式append:追加。overwrite:覆盖。
.mode("overwrite")
//这里需要指定数据格式: parquet, orc, avro(需要添加外部依赖包), json, csv, text。
//不指定的话默认是parquet格式。
//注意:text数据格式在这里不支持int数据类型
//针对普通文本文件数据格式(json、csv),默认创建的Hive表是SequenceFile格式的,
//无法读取生成的普通文件,不要使用这种方式。
//parquet, orc数据格式可以正常使用
.format("parquet")
.saveAsTable("student_bak")
sparkSession.stop()
}
}
3,运行测试
(1)上面代码运行后,我们可以到到 Hive 中查看目标表的详细信息:
show create table student_bak;
(2)查看这个表的数据也是都有的:
select * from student_bak;

附:使用 saveAsTable() 方法的注意事项
1,text 数据格式不支持 int 数据类型
(1)我们这里验证一下,首先到 hive 中删除表同时删除对应的 hdfs 目录
drop table student_bak;
(2)然后修改代码,将 format 中的值改为 text:
//写入数据(表不存在)
resDf.write
//指定数据写入格式append:追加。overwrite:覆盖。
.mode("overwrite")
//这里需要指定数据格式: parquet, orc, avro(需要添加外部依赖包), json, csv, text。
//不指定的话默认是parquet格式。
//注意:text数据格式在这里不支持int数据类型
//针对普通文本文件数据格式(json、csv),默认创建的Hive表是SequenceFile格式的,
//无法读取生成的普通文件,不要使用这种方式。
//parquet, orc数据格式可以正常使用
.format("text")
.saveAsTable("student_bak")
(3)重新执行程序,发现程序报错,提示说 Text 格式不支持 int 数据类型。

2,普通文本文件数据格式(json、csv)无法正常使用
(1)针对普通文本文件数据格式(json、csv),默认创建的 Hive 表是 SequenceFile 格式的,无法读取生成的普通文件,也无法正常使用。我们这里验证一下,首先到 hive 中删除表同时删除对应的 hdfs 目录drop table student_bak;
(2)修改代码,将 format 中的值改为 json:
//写入数据(表不存在)
resDf.write
//指定数据写入格式append:追加。overwrite:覆盖。
.mode("overwrite")
//这里需要指定数据格式: parquet, orc, avro(需要添加外部依赖包), json, csv, text。
//不指定的话默认是parquet格式。
//注意:text数据格式在这里不支持int数据类型
//针对普通文本文件数据格式(json、csv),默认创建的Hive表是SequenceFile格式的,
//无法读取生成的普通文件,不要使用这种方式。
//parquet, orc数据格式可以正常使用
.format("json")
.saveAsTable("student_bak")
(3)执行程序。程序虽然不报错,但是表中的数据是无法查询的。首先我们先看表结构:
show create table student_bak;
- 此时发现表中只有一列,并且表的输入和输出格式化类是 SequenceFile。
(4)查询表中的数据,发现无法查询。这是因为产生的文件是 json 文件,不是一个 SequenceFile 文件,所以无法读取。使用 csv 格式和 json 格式都会遇到同样的问题。
select * from student_bak;
3,针对已存在的表,当 mode 为 append 时,format 必须指定为 hive
(1)针对已存在的表,当 mode 为 append 时,format 里面必须指定为 hive,否则会报错。我们这里验证一下,首先到 hive 中删除表同时删除对应的 hdfs 目录。
drop table student_bak;
(2)然后编写如下代码:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object SparkSQLWriteHive_2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")
//获取SparkSession,为了操作SparkSQL
val sparkSession = SparkSession
.builder()
.appName("SparkSQLWriteHive_2")
.config(conf)
//idea本地运行时需要通过spark.sql.warehouse.dir指定一下hive对应的hdfs路径全称
.config("spark.sql.warehouse.dir", "hdfs://node1:9000/user/hive/warehouse")
//开启对Hive的支持,支持连接Hive的MetaStore、Hive的序列化、Hive的自定义函数
.enableHiveSupport()
.getOrCreate()
import sparkSession.sql
//查询数据
val resDf = sql("select * from student")
//创建目标表
sql(
"""
|create table if not exists student_bak(
| id int,
| stu_name string,
| stu_birthday date,
| online boolean
|)using hive
| OPTIONS(
| fileFormat 'textfile',
| fieldDelim '\t'
| )
|""".stripMargin)
//写入数据(表存在 )
resDf.write
//指定数据写入格式append:追加。overwrite:覆盖。
.mode("append")
//这里需要指定数据格式: parquet, orc, avro(需要添加外部依赖包), json, csv, text。
//不指定的话默认是parquet格式。
//注意:text数据格式在这里不支持int数据类型
//针对普通文本文件数据格式(json、csv),默认创建的Hive表是SequenceFile格式的,
//无法读取生成的普通文件,不要使用这种方式。
//parquet, orc数据格式可以正常使用
.format("text")
.saveAsTable("student_bak")
sparkSession.stop()
}
}
(3)执行程序,发现执行报错,提示这个表是 HiveFileFormat,不支持 TextDataSourceV2 格式。
提示:要解决这个问题只需将 format 中的参数置指定为 hive 即可。
全部评论(0)