返回 导航

大数据

hangge.com

Flink SQL - 兼容Hive SQL教程详解(调用Hive SQL函数、使用Hive SQL语法)

作者:hangge | 2025-04-25 08:36
    由于 Flink SQLHive SQL 底层使用的 SQL 解析引擎不一样,所以目前 Flink SQL 中无法直接使用 Hive SQL 中的函数和语法。但是在工作中大家可能前期使用 Hive 比较多,对 Hive SQL 的各种用法非常熟悉,现在切换到 Flink SQL 之后,一些 Hive SQL 中支持的语法和函数无法在 Flink SQL 中使用,感觉用起来不太顺手。
    基于这些问题,Flink SQL 提供的有解决方案。
  • 通过 HiveModule 实现 Hive SQL 函数的兼容,主要是针对函数的支持。
  • 通过 HiveDialect 实现 Hive SQL 语法的兼容,主要是针对 DDLDML 语句的支持。
    下面通过样例分别演示这两种解决方案。

一、通过 HiveModule 实现 Hive SQL 函数的兼容

1,基本介绍

(1)Flink SQL 中可以支持多种 Module,针对这些 ModuleFlink SQL 提供了一些通用命令:
  • 通过 LOAD 命令:可以加载 Flink 中内置的或者用户自定义的 Module
  • 通过 UNLOAD:可以卸载 Flink 中内置的或者用户自定义的 Module
(2)HiveModule 属于其中一种实现,通过 HiveModule 可以在 Flink SQL 中使用 Hive 中的函数。

2,准备工作

要使用 HiveModule 吗,首先程序项目中需要先引入对应的依赖:
<!-- hive-connector -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-hive_2.12</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>3.1.2</version>
</dependency>

3,样例代码

(1)该样例通过加载 HiveModule,从而使用 Hive SQL 中支持的 GET_JSON_OBJECT 函数。下面是 Scala 语言代码:
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
import org.apache.flink.table.module.hive.HiveModule

object UseHiveModuleSQL {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val settings = EnvironmentSettings
      .newInstance()
      //指定执行模式,支持inBatchMode和inStreamingMode
      .inStreamingMode()
      .build()
    val tEnv = TableEnvironment.create(settings)

    //查看目前可以使用的Module
    tEnv.executeSql("SHOW FULL MODULES").print()

    //加载Hive Module 使用API
    tEnv.loadModule("hive",new HiveModule("3.1.2"))
    //加载Hive Module 使用DDL
    //tEnv.executeSql("LOAD MODULE hive WITH ('hive-version' = '3.1.2')")

    //使用Hive Module,禁用Core Module
    //tEnv.executeSql("USE MODULES hive")

    //使用Core Module,禁用Hive Module
    //tEnv.executeSql("USE MODULES core")

    //使用Hive Module和Core Module,改变加载Module的顺序
    //tEnv.executeSql("USE MODULES hive,core")

    //卸载Hive Module
    //tEnv.executeSql("UNLOAD MODULE hive")

    //GET_JSON_OBJECT是Hive SQL中支持的函数,Flink SQL中是不支持的
    val execSql =
      """
        |SELECT
        |GET_JSON_OBJECT('{"name":"tom","age":20}','$.name') AS name
        |""".stripMargin

    tEnv.executeSql(execSql).print()
  }
}
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.module.hive.HiveModule;

public class UseHiveModuleSQLJava {
    public static void main(String[] args) {
        // 创建执行环境
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                // 指定执行模式,支持 inBatchMode 和 inStreamingMode
                .inStreamingMode()
                .build();

        TableEnvironment tEnv = TableEnvironment.create(settings);

        // 查看目前可以使用的 Module
        tEnv.executeSql("SHOW FULL MODULES").print();

        // 加载 Hive Module 使用 API
        tEnv.loadModule("hive", new HiveModule("3.1.2"));

        // 如果需要使用 DDL 加载 Hive Module,可以使用以下代码
        // tEnv.executeSql("LOAD MODULE hive WITH ('hive-version' = '3.1.2')");

        // 使用 Hive Module,禁用 Core Module
        // tEnv.executeSql("USE MODULES hive");

        // 使用 Core Module,禁用 Hive Module
        // tEnv.executeSql("USE MODULES core");

        // 使用 Hive Module 和 Core Module,改变加载 Module 的顺序
        // tEnv.executeSql("USE MODULES hive,core");

        // 卸载 Hive Module
        // tEnv.executeSql("UNLOAD MODULE hive");

        // GET_JSON_OBJECT 是 Hive SQL 中支持的函数,Flink SQL 中是不支持的
        String execSql = "SELECT " +
                        "GET_JSON_OBJECT('{\"name\":\"tom\",\"age\":20}', '$.name') AS name";

        tEnv.executeSql(execSql).print();
    }
}

(2)运行程序,可以看到控制台输出内容如下,说明我们成功调用了 Hive SQL 中的 GET_JSON_OBJECT 函数。

二、通过 HiveDialect 实现 Hive SQL 语法的兼容

1,基本介绍

(1)如果想要让 Flink SQL 兼容 Hive SQL 的语法,可以将 Flink SQL 的方言改为 hive,默认方言是 default。 如果将 Flink SQL 的方言改为 hive,那么就可以支持 Hive SQL 的各种语法特性了。

(2)注意:想要开启 hive 方言,必须要使用 HiveCatalog。因为开启了 Hive 方言,是可以在 Flink SQL 中使用 Hive SQL 语法去创建 Hive 表的,Hive 表是需要存储在 Hive Metastore 中的,所以需要使用 HiveCatalog

2,准备工作

(1)本文主要来演示一下 HiveCatalog 的使用,因此首先要准备好 Hive 环境,具体可以参考我之前写的文章:

(2)接着,创建一个 hive-site.xml 文件:
vi hive-site.xml
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>hive.metastore.uris</name>
    <value>thrift://192.168.121.128:9083</value>
  </property>
</configuration>
注意:具体的 HDFS 目录需要和后面代码中指定的 hiveConfDir 参数的值保持一致。
 hdfs dfs -mkdir -p /hive-conf
hdfs dfs -put hive-site.xml /hive-conf

(3)然后在上面配置的节点上启动 Hivemetastore 服务:
cd /usr/local/hive/
nohup bin/hive --service metastore -p 9083 2>&1 >/dev/null &

(4)最后,在使用 HiveCatalog 的程序项目中需要先引入对应的依赖:
<!-- hive-connector -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-hive_2.12</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>3.1.2</version>
    <exclusions>
        <exclusion>
            <artifactId>log4j-slf4j-impl</artifactId>
            <groupId>org.apache.logging.log4j</groupId>
        </exclusion>
    </exclusions>
</dependency>
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/C:/Users/hangge/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/C:/Users/hangge/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.10.0/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

3,样例代码

(1)该样例我们使用 HiveSQL 建表语句来创建表,下面是 Scala 语言代码:
import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect, TableEnvironment}

object UseHiveDialectSQL {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val settings = EnvironmentSettings
      .newInstance()
      //指定执行模式,支持inBatchMode和inStreamingMode
      .inStreamingMode()
      .build()
    val tEnv = TableEnvironment.create(settings)

    //创建Catalog
    val catalogDDL =
      """
        |CREATE CATALOG myhivecatalog WITH (
        |    'type' = 'hive',
        |    'default-database' = 'default',
        |    'hive-conf-dir' = 'hdfs://192.168.121.128:9000/hive-conf'
        |)
        |""".stripMargin
    tEnv.executeSql(catalogDDL)

    //进入HiveCatalog和default数据库
    tEnv.executeSql("USE CATALOG  myhivecatalog")// 使用DDL
    tEnv.executeSql("USE `default`")// 使用DDL

    //设置使用hive方言【注意:此时必须使用HiveCatalog】
    tEnv.getConfig.setSqlDialect(SqlDialect.HIVE)

    // FlinkSQL建表语句执行会报错
    val flinkSqlDDL =
      """
        |CREATE TABLE IF NOT EXISTS orders_test (
        |    order_id     BIGINT,
        |    price        DECIMAL(10,2),
        |    order_time   TIMESTAMP
        |) WITH (
        |  'connector' = 'datagen',
        |  'rows-per-second' = '1'
        |)
        |""".stripMargin
    //tEnv.executeSql(flinkSqlDDL)

    //HiveSQL建表语句可以正常执行
    val hiveSqlDDL =
      """
        |CREATE TABLE IF NOT EXISTS flink_stu(
        |id INT,
        |name STRING
        |)
        |""".stripMargin
    tEnv.executeSql(hiveSqlDDL)

    //tEnv.executeSql("SELECT * FROM flink_stu").print()
  }
}
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;

public class UseHiveDialectSQLJava {
    public static void main(String[] args) {
        // 创建执行环境
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                // 指定执行模式,支持 inBatchMode 和 inStreamingMode
                .inStreamingMode()
                .build();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        // 创建 Catalog
        String catalogDDL =
                "CREATE CATALOG myhivecatalog WITH (\n" +
                        "    'type' = 'hive',\n" +
                        "    'default-database' = 'default',\n" +
                        "    'hive-conf-dir' = 'hdfs://192.168.121.128:9000/hive-conf'\n" +
                        ")";
        tEnv.executeSql(catalogDDL);

        // 进入 Hive Catalog 和 default 数据库
        tEnv.executeSql("USE CATALOG myhivecatalog");  // 使用 DDL
        tEnv.executeSql("USE `default`");  // 使用 DDL

        // 设置使用 Hive 方言【注意:此时必须使用 HiveCatalog】
        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

        // FlinkSQL 建表语句执行会报错
        String flinkSqlDDL =
                "CREATE TABLE IF NOT EXISTS orders_test (\n" +
                        "    order_id     BIGINT,\n" +
                        "    price        DECIMAL(10,2),\n" +
                        "    order_time   TIMESTAMP\n" +
                        ") WITH (\n" +
                        "  'connector' = 'datagen',\n" +
                        "  'rows-per-second' = '1'\n" +
                        ")";
        // tEnv.executeSql(flinkSqlDDL);  // 如果需要执行此语句,取消注释

        // HiveSQL 建表语句可以正常执行
        String hiveSqlDDL =
                "CREATE TABLE IF NOT EXISTS flink_stu(\n" +
                        "id INT,\n" +
                        "name STRING\n" +
                        ")";
        tEnv.executeSql(hiveSqlDDL);

        // tEnv.executeSql("SELECT * FROM flink_stu").print();  // 如果需要打印结果,取消注释
    }
}

(2)上面样例在执行 flinkSqlDDL 语句的时候,程序会报错,提示语法错误,而使用 hiveSqlDDL 语句是可以正常创建表的,并且这个表也会存储在 Hive Metastore 中,到 MySQLtbls 表中是可以看到的。

4,hive 方言创建的表与 default 方言创建的表对比

(1)我们在 Hive 客户端中查看 flink_stu 表的详细信息:
show create table flink_stu;

(2)而我们之前文章(点击查看)创建的 orders 表就是 Flink SQLdefault 方言创建的,在 Hive 客户端中查看它的表结构:
show create table orders;
  • 这种表相当于就是 Flink SQL 自身的表,只是在 Hive 中存储了元数据信息而已,在 Hive 中是无法查询的。
评论

全部评论(0)

回到顶部