Flink SQL - 兼容Hive SQL教程详解(调用Hive SQL函数、使用Hive SQL语法)
作者:hangge | 2025-04-25 08:36
由于 Flink SQL 和 Hive SQL 底层使用的 SQL 解析引擎不一样,所以目前 Flink SQL 中无法直接使用 Hive SQL 中的函数和语法。但是在工作中大家可能前期使用 Hive 比较多,对 Hive SQL 的各种用法非常熟悉,现在切换到 Flink SQL 之后,一些 Hive SQL 中支持的语法和函数无法在 Flink SQL 中使用,感觉用起来不太顺手。
基于这些问题,Flink SQL 提供的有解决方案。
- 通过 HiveModule 实现 Hive SQL 函数的兼容,主要是针对函数的支持。
- 通过 HiveDialect 实现 Hive SQL 语法的兼容,主要是针对 DDL 和 DML 语句的支持。
下面通过样例分别演示这两种解决方案。
一、通过 HiveModule 实现 Hive SQL 函数的兼容
1,基本介绍
(1)Flink SQL 中可以支持多种 Module,针对这些 Module,Flink SQL 提供了一些通用命令:
- 通过 LOAD 命令:可以加载 Flink 中内置的或者用户自定义的 Module。
- 通过 UNLOAD:可以卸载 Flink 中内置的或者用户自定义的 Module。
(2)HiveModule 属于其中一种实现,通过 HiveModule 可以在 Flink SQL 中使用 Hive 中的函数。
2,准备工作
要使用 HiveModule 吗,首先程序项目中需要先引入对应的依赖:
<!-- hive-connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.12</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>
3,样例代码
(1)该样例通过加载 HiveModule,从而使用 Hive SQL 中支持的 GET_JSON_OBJECT 函数。下面是 Scala 语言代码:
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
import org.apache.flink.table.module.hive.HiveModule
object UseHiveModuleSQL {
def main(args: Array[String]): Unit = {
//创建执行环境
val settings = EnvironmentSettings
.newInstance()
//指定执行模式,支持inBatchMode和inStreamingMode
.inStreamingMode()
.build()
val tEnv = TableEnvironment.create(settings)
//查看目前可以使用的Module
tEnv.executeSql("SHOW FULL MODULES").print()
//加载Hive Module 使用API
tEnv.loadModule("hive",new HiveModule("3.1.2"))
//加载Hive Module 使用DDL
//tEnv.executeSql("LOAD MODULE hive WITH ('hive-version' = '3.1.2')")
//使用Hive Module,禁用Core Module
//tEnv.executeSql("USE MODULES hive")
//使用Core Module,禁用Hive Module
//tEnv.executeSql("USE MODULES core")
//使用Hive Module和Core Module,改变加载Module的顺序
//tEnv.executeSql("USE MODULES hive,core")
//卸载Hive Module
//tEnv.executeSql("UNLOAD MODULE hive")
//GET_JSON_OBJECT是Hive SQL中支持的函数,Flink SQL中是不支持的
val execSql =
"""
|SELECT
|GET_JSON_OBJECT('{"name":"tom","age":20}','$.name') AS name
|""".stripMargin
tEnv.executeSql(execSql).print()
}
}
- 下面是使用 Java 语言实现同样的功能:
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.module.hive.HiveModule;
public class UseHiveModuleSQLJava {
public static void main(String[] args) {
// 创建执行环境
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
// 指定执行模式,支持 inBatchMode 和 inStreamingMode
.inStreamingMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
// 查看目前可以使用的 Module
tEnv.executeSql("SHOW FULL MODULES").print();
// 加载 Hive Module 使用 API
tEnv.loadModule("hive", new HiveModule("3.1.2"));
// 如果需要使用 DDL 加载 Hive Module,可以使用以下代码
// tEnv.executeSql("LOAD MODULE hive WITH ('hive-version' = '3.1.2')");
// 使用 Hive Module,禁用 Core Module
// tEnv.executeSql("USE MODULES hive");
// 使用 Core Module,禁用 Hive Module
// tEnv.executeSql("USE MODULES core");
// 使用 Hive Module 和 Core Module,改变加载 Module 的顺序
// tEnv.executeSql("USE MODULES hive,core");
// 卸载 Hive Module
// tEnv.executeSql("UNLOAD MODULE hive");
// GET_JSON_OBJECT 是 Hive SQL 中支持的函数,Flink SQL 中是不支持的
String execSql = "SELECT " +
"GET_JSON_OBJECT('{\"name\":\"tom\",\"age\":20}', '$.name') AS name";
tEnv.executeSql(execSql).print();
}
}

二、通过 HiveDialect 实现 Hive SQL 语法的兼容
1,基本介绍
(1)如果想要让 Flink SQL 兼容 Hive SQL 的语法,可以将 Flink SQL 的方言改为 hive,默认方言是 default。 如果将 Flink SQL 的方言改为 hive,那么就可以支持 Hive SQL 的各种语法特性了。
(2)注意:想要开启 hive 方言,必须要使用 HiveCatalog。因为开启了 Hive 方言,是可以在 Flink SQL 中使用 Hive SQL 语法去创建 Hive 表的,Hive 表是需要存储在 Hive Metastore 中的,所以需要使用 HiveCatalog。
2,准备工作
(1)本文主要来演示一下 HiveCatalog 的使用,因此首先要准备好 Hive 环境,具体可以参考我之前写的文章:
(2)接着,创建一个 hive-site.xml 文件:
vi hive-site.xml
- 文件内容如下,里面只需要配置 hive.metastore.uris 属性即可:
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://192.168.121.128:9083</value>
</property>
</configuration>
- 然后将 hive-site.xml 这个文件上传到 HDFS 上。
注意:具体的 HDFS 目录需要和后面代码中指定的 hiveConfDir 参数的值保持一致。
hdfs dfs -mkdir -p /hive-conf hdfs dfs -put hive-site.xml /hive-conf
(3)然后在上面配置的节点上启动 Hive 的 metastore 服务:
cd /usr/local/hive/ nohup bin/hive --service metastore -p 9083 2>&1 >/dev/null &
(4)最后,在使用 HiveCatalog 的程序项目中需要先引入对应的依赖:
<!-- hive-connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.12</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
<exclusions>
<exclusion>
<artifactId>log4j-slf4j-impl</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
</exclusions>
</dependency>
- 注意:我这里将 log4j-slf4j-imp 排除掉,因为 slf4j-log4j12 和 log4j-slf4j-imp 冲突了,实际上使用的是 org.slf4j.impl.Log4jLoggerFactory(slf4j-log4j12)。如果不排除,运行程序时会提示日志冲突(虽然不影响程序正常运行)。
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/C:/Users/hangge/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/C:/Users/hangge/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.10.0/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

3,样例代码
(1)该样例我们使用 HiveSQL 建表语句来创建表,下面是 Scala 语言代码:
import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect, TableEnvironment}
object UseHiveDialectSQL {
def main(args: Array[String]): Unit = {
//创建执行环境
val settings = EnvironmentSettings
.newInstance()
//指定执行模式,支持inBatchMode和inStreamingMode
.inStreamingMode()
.build()
val tEnv = TableEnvironment.create(settings)
//创建Catalog
val catalogDDL =
"""
|CREATE CATALOG myhivecatalog WITH (
| 'type' = 'hive',
| 'default-database' = 'default',
| 'hive-conf-dir' = 'hdfs://192.168.121.128:9000/hive-conf'
|)
|""".stripMargin
tEnv.executeSql(catalogDDL)
//进入HiveCatalog和default数据库
tEnv.executeSql("USE CATALOG myhivecatalog")// 使用DDL
tEnv.executeSql("USE `default`")// 使用DDL
//设置使用hive方言【注意:此时必须使用HiveCatalog】
tEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
// FlinkSQL建表语句执行会报错
val flinkSqlDDL =
"""
|CREATE TABLE IF NOT EXISTS orders_test (
| order_id BIGINT,
| price DECIMAL(10,2),
| order_time TIMESTAMP
|) WITH (
| 'connector' = 'datagen',
| 'rows-per-second' = '1'
|)
|""".stripMargin
//tEnv.executeSql(flinkSqlDDL)
//HiveSQL建表语句可以正常执行
val hiveSqlDDL =
"""
|CREATE TABLE IF NOT EXISTS flink_stu(
|id INT,
|name STRING
|)
|""".stripMargin
tEnv.executeSql(hiveSqlDDL)
//tEnv.executeSql("SELECT * FROM flink_stu").print()
}
}
- 下面是使用 Java 语言实现同样功能:
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;
public class UseHiveDialectSQLJava {
public static void main(String[] args) {
// 创建执行环境
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
// 指定执行模式,支持 inBatchMode 和 inStreamingMode
.inStreamingMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
// 创建 Catalog
String catalogDDL =
"CREATE CATALOG myhivecatalog WITH (\n" +
" 'type' = 'hive',\n" +
" 'default-database' = 'default',\n" +
" 'hive-conf-dir' = 'hdfs://192.168.121.128:9000/hive-conf'\n" +
")";
tEnv.executeSql(catalogDDL);
// 进入 Hive Catalog 和 default 数据库
tEnv.executeSql("USE CATALOG myhivecatalog"); // 使用 DDL
tEnv.executeSql("USE `default`"); // 使用 DDL
// 设置使用 Hive 方言【注意:此时必须使用 HiveCatalog】
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
// FlinkSQL 建表语句执行会报错
String flinkSqlDDL =
"CREATE TABLE IF NOT EXISTS orders_test (\n" +
" order_id BIGINT,\n" +
" price DECIMAL(10,2),\n" +
" order_time TIMESTAMP\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second' = '1'\n" +
")";
// tEnv.executeSql(flinkSqlDDL); // 如果需要执行此语句,取消注释
// HiveSQL 建表语句可以正常执行
String hiveSqlDDL =
"CREATE TABLE IF NOT EXISTS flink_stu(\n" +
"id INT,\n" +
"name STRING\n" +
")";
tEnv.executeSql(hiveSqlDDL);
// tEnv.executeSql("SELECT * FROM flink_stu").print(); // 如果需要打印结果,取消注释
}
}

4,hive 方言创建的表与 default 方言创建的表对比
(1)我们在 Hive 客户端中查看 flink_stu 表的详细信息:
show create table flink_stu;
-
可以发现这个使用 hive 方言创建的表就是常规的 Hive 表。

show create table orders;
- 这种表相当于就是 Flink SQL 自身的表,只是在 Hive 中存储了元数据信息而已,在 Hive 中是无法查询的。

全部评论(0)