Flink SQL - 兼容Hive SQL教程详解(调用Hive SQL函数、使用Hive SQL语法)
作者:hangge | 2025-04-25 08:36
由于 Flink SQL 和 Hive SQL 底层使用的 SQL 解析引擎不一样,所以目前 Flink SQL 中无法直接使用 Hive SQL 中的函数和语法。但是在工作中大家可能前期使用 Hive 比较多,对 Hive SQL 的各种用法非常熟悉,现在切换到 Flink SQL 之后,一些 Hive SQL 中支持的语法和函数无法在 Flink SQL 中使用,感觉用起来不太顺手。
基于这些问题,Flink SQL 提供的有解决方案。
- 通过 HiveModule 实现 Hive SQL 函数的兼容,主要是针对函数的支持。
- 通过 HiveDialect 实现 Hive SQL 语法的兼容,主要是针对 DDL 和 DML 语句的支持。
下面通过样例分别演示这两种解决方案。
一、通过 HiveModule 实现 Hive SQL 函数的兼容
1,基本介绍
(1)Flink SQL 中可以支持多种 Module,针对这些 Module,Flink SQL 提供了一些通用命令:
- 通过 LOAD 命令:可以加载 Flink 中内置的或者用户自定义的 Module。
- 通过 UNLOAD:可以卸载 Flink 中内置的或者用户自定义的 Module。
(2)HiveModule 属于其中一种实现,通过 HiveModule 可以在 Flink SQL 中使用 Hive 中的函数。
2,准备工作
要使用 HiveModule 吗,首先程序项目中需要先引入对应的依赖:
<!-- hive-connector --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_2.12</artifactId> <version>1.16.0</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>3.1.2</version> </dependency>
3,样例代码
(1)该样例通过加载 HiveModule,从而使用 Hive SQL 中支持的 GET_JSON_OBJECT 函数。下面是 Scala 语言代码:
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment} import org.apache.flink.table.module.hive.HiveModule object UseHiveModuleSQL { def main(args: Array[String]): Unit = { //创建执行环境 val settings = EnvironmentSettings .newInstance() //指定执行模式,支持inBatchMode和inStreamingMode .inStreamingMode() .build() val tEnv = TableEnvironment.create(settings) //查看目前可以使用的Module tEnv.executeSql("SHOW FULL MODULES").print() //加载Hive Module 使用API tEnv.loadModule("hive",new HiveModule("3.1.2")) //加载Hive Module 使用DDL //tEnv.executeSql("LOAD MODULE hive WITH ('hive-version' = '3.1.2')") //使用Hive Module,禁用Core Module //tEnv.executeSql("USE MODULES hive") //使用Core Module,禁用Hive Module //tEnv.executeSql("USE MODULES core") //使用Hive Module和Core Module,改变加载Module的顺序 //tEnv.executeSql("USE MODULES hive,core") //卸载Hive Module //tEnv.executeSql("UNLOAD MODULE hive") //GET_JSON_OBJECT是Hive SQL中支持的函数,Flink SQL中是不支持的 val execSql = """ |SELECT |GET_JSON_OBJECT('{"name":"tom","age":20}','$.name') AS name |""".stripMargin tEnv.executeSql(execSql).print() } }
- 下面是使用 Java 语言实现同样的功能:
import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.module.hive.HiveModule; public class UseHiveModuleSQLJava { public static void main(String[] args) { // 创建执行环境 EnvironmentSettings settings = EnvironmentSettings .newInstance() // 指定执行模式,支持 inBatchMode 和 inStreamingMode .inStreamingMode() .build(); TableEnvironment tEnv = TableEnvironment.create(settings); // 查看目前可以使用的 Module tEnv.executeSql("SHOW FULL MODULES").print(); // 加载 Hive Module 使用 API tEnv.loadModule("hive", new HiveModule("3.1.2")); // 如果需要使用 DDL 加载 Hive Module,可以使用以下代码 // tEnv.executeSql("LOAD MODULE hive WITH ('hive-version' = '3.1.2')"); // 使用 Hive Module,禁用 Core Module // tEnv.executeSql("USE MODULES hive"); // 使用 Core Module,禁用 Hive Module // tEnv.executeSql("USE MODULES core"); // 使用 Hive Module 和 Core Module,改变加载 Module 的顺序 // tEnv.executeSql("USE MODULES hive,core"); // 卸载 Hive Module // tEnv.executeSql("UNLOAD MODULE hive"); // GET_JSON_OBJECT 是 Hive SQL 中支持的函数,Flink SQL 中是不支持的 String execSql = "SELECT " + "GET_JSON_OBJECT('{\"name\":\"tom\",\"age\":20}', '$.name') AS name"; tEnv.executeSql(execSql).print(); } }

二、通过 HiveDialect 实现 Hive SQL 语法的兼容
1,基本介绍
(1)如果想要让 Flink SQL 兼容 Hive SQL 的语法,可以将 Flink SQL 的方言改为 hive,默认方言是 default。 如果将 Flink SQL 的方言改为 hive,那么就可以支持 Hive SQL 的各种语法特性了。
(2)注意:想要开启 hive 方言,必须要使用 HiveCatalog。因为开启了 Hive 方言,是可以在 Flink SQL 中使用 Hive SQL 语法去创建 Hive 表的,Hive 表是需要存储在 Hive Metastore 中的,所以需要使用 HiveCatalog。
2,准备工作
(1)本文主要来演示一下 HiveCatalog 的使用,因此首先要准备好 Hive 环境,具体可以参考我之前写的文章:
(2)接着,创建一个 hive-site.xml 文件:
vi hive-site.xml
- 文件内容如下,里面只需要配置 hive.metastore.uris 属性即可:
<?xml version="1.0" encoding="UTF-8" standalone="no"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>hive.metastore.uris</name> <value>thrift://192.168.121.128:9083</value> </property> </configuration>
- 然后将 hive-site.xml 这个文件上传到 HDFS 上。
注意:具体的 HDFS 目录需要和后面代码中指定的 hiveConfDir 参数的值保持一致。
hdfs dfs -mkdir -p /hive-conf hdfs dfs -put hive-site.xml /hive-conf
(3)然后在上面配置的节点上启动 Hive 的 metastore 服务:
cd /usr/local/hive/ nohup bin/hive --service metastore -p 9083 2>&1 >/dev/null &
(4)最后,在使用 HiveCatalog 的程序项目中需要先引入对应的依赖:
<!-- hive-connector --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_2.12</artifactId> <version>1.16.0</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>3.1.2</version> <exclusions> <exclusion> <artifactId>log4j-slf4j-impl</artifactId> <groupId>org.apache.logging.log4j</groupId> </exclusion> </exclusions> </dependency>
- 注意:我这里将 log4j-slf4j-imp 排除掉,因为 slf4j-log4j12 和 log4j-slf4j-imp 冲突了,实际上使用的是 org.slf4j.impl.Log4jLoggerFactory(slf4j-log4j12)。如果不排除,运行程序时会提示日志冲突(虽然不影响程序正常运行)。
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/C:/Users/hangge/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/C:/Users/hangge/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.10.0/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

3,样例代码
(1)该样例我们使用 HiveSQL 建表语句来创建表,下面是 Scala 语言代码:
import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect, TableEnvironment} object UseHiveDialectSQL { def main(args: Array[String]): Unit = { //创建执行环境 val settings = EnvironmentSettings .newInstance() //指定执行模式,支持inBatchMode和inStreamingMode .inStreamingMode() .build() val tEnv = TableEnvironment.create(settings) //创建Catalog val catalogDDL = """ |CREATE CATALOG myhivecatalog WITH ( | 'type' = 'hive', | 'default-database' = 'default', | 'hive-conf-dir' = 'hdfs://192.168.121.128:9000/hive-conf' |) |""".stripMargin tEnv.executeSql(catalogDDL) //进入HiveCatalog和default数据库 tEnv.executeSql("USE CATALOG myhivecatalog")// 使用DDL tEnv.executeSql("USE `default`")// 使用DDL //设置使用hive方言【注意:此时必须使用HiveCatalog】 tEnv.getConfig.setSqlDialect(SqlDialect.HIVE) // FlinkSQL建表语句执行会报错 val flinkSqlDDL = """ |CREATE TABLE IF NOT EXISTS orders_test ( | order_id BIGINT, | price DECIMAL(10,2), | order_time TIMESTAMP |) WITH ( | 'connector' = 'datagen', | 'rows-per-second' = '1' |) |""".stripMargin //tEnv.executeSql(flinkSqlDDL) //HiveSQL建表语句可以正常执行 val hiveSqlDDL = """ |CREATE TABLE IF NOT EXISTS flink_stu( |id INT, |name STRING |) |""".stripMargin tEnv.executeSql(hiveSqlDDL) //tEnv.executeSql("SELECT * FROM flink_stu").print() } }
- 下面是使用 Java 语言实现同样功能:
import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.TableEnvironment; public class UseHiveDialectSQLJava { public static void main(String[] args) { // 创建执行环境 EnvironmentSettings settings = EnvironmentSettings .newInstance() // 指定执行模式,支持 inBatchMode 和 inStreamingMode .inStreamingMode() .build(); TableEnvironment tEnv = TableEnvironment.create(settings); // 创建 Catalog String catalogDDL = "CREATE CATALOG myhivecatalog WITH (\n" + " 'type' = 'hive',\n" + " 'default-database' = 'default',\n" + " 'hive-conf-dir' = 'hdfs://192.168.121.128:9000/hive-conf'\n" + ")"; tEnv.executeSql(catalogDDL); // 进入 Hive Catalog 和 default 数据库 tEnv.executeSql("USE CATALOG myhivecatalog"); // 使用 DDL tEnv.executeSql("USE `default`"); // 使用 DDL // 设置使用 Hive 方言【注意:此时必须使用 HiveCatalog】 tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); // FlinkSQL 建表语句执行会报错 String flinkSqlDDL = "CREATE TABLE IF NOT EXISTS orders_test (\n" + " order_id BIGINT,\n" + " price DECIMAL(10,2),\n" + " order_time TIMESTAMP\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' = '1'\n" + ")"; // tEnv.executeSql(flinkSqlDDL); // 如果需要执行此语句,取消注释 // HiveSQL 建表语句可以正常执行 String hiveSqlDDL = "CREATE TABLE IF NOT EXISTS flink_stu(\n" + "id INT,\n" + "name STRING\n" + ")"; tEnv.executeSql(hiveSqlDDL); // tEnv.executeSql("SELECT * FROM flink_stu").print(); // 如果需要打印结果,取消注释 } }

4,hive 方言创建的表与 default 方言创建的表对比
(1)我们在 Hive 客户端中查看 flink_stu 表的详细信息:
show create table flink_stu;
-
可以发现这个使用 hive 方言创建的表就是常规的 Hive 表。
show create table orders;
- 这种表相当于就是 Flink SQL 自身的表,只是在 Hive 中存储了元数据信息而已,在 Hive 中是无法查询的。

全部评论(0)