Flink SQL - Flink SQL Client客户端工具使用详解
作者:hangge | 2025-04-27 08:38
在之前的文章中,我们通过 Flink SQL 的形式查询 Flink 中的实时数据,虽然核心的业务逻辑是通过 SQL 实现的,但是这个 SQL 是需要嵌入到 Flink 程序中的,多少还是要写一些 Java 或者 Scala 代码的,并且最后提交执行的时候还是需要对代码编译打包的,这个过程是比较繁琐的,用起来不太方便,特别是在代码开发调试阶段。
(3)当 yarn-session.sh 脚本执行成功之后,默认会在本地的临时文件中写入在 YARN 中创建的 Flink 集群的信息。我们可以查看一下这个临时文件:
(2)从启动日志中也可以看到,读取到了这个文件:/tmp/.yarn-properties-root
(2)最终看到的结果是类似下面这样的,这个 SQL 默认会以流计算的形式去执行:
(2)再次执行同样的 SQL,会发现输出如下内容:
(2)set 命令不区分大小写,都可以正常识别。上面命令执行后输出类似如下内容:
(3)设置 Flink 任务名称。
(6)对于有依赖的多个任务,通过这个配置去选择是同步执行还是异步执行,设置为 true 表示同步执行,false 表示异步执行。
(3)reset 后面不指定参数的时候表示重置这个会话中所有的参数。
(2)在这个 sql 文件中定义一个输入表,输出表、以及具体的计算逻辑,内容如下:
(2)然后将该文件上传到 HDFS 上:
(2)查看 HDFS 上产生的结果,说明任务执行成功。这就是使用 sql-client 执行 SQL 文件的流程,这样其实我们就只需要开发一个 sql 脚本文件即可,把需要的 DDL 语句、DML 语句、以及 SET 命令都定义到里面,直接提交触发一个任务的执行。
其实 Flink SQL 也提供一个类似于 Hive 客户端的功能,那便是 Flink SQL Client 这个客户端工具。本文将对其进行详细介绍。
一、基本介绍
1,什么是Flink SQL Clinet?
(1)Flink 中的 SQL Clinet 客户端工具提供了一种简单的方法来编写、调试和提交 Flink 程序,不需要一行 Java 或者 Scala 代码。
(2)Flink SQL Clinet 客户端工具允许在命令行上从正在运行的 Flink 程序中计算和查询数据。
- 就类似于使用 Hive 中提供的 hive 客户端或者 beeline 客户端命令行一样,直接编写 SQL,提交 SQL 任务去执行,并且获取 SQL 的执行结果。
(3)SQL Client 目前可以运行在 Flink 的 Standalone 集群和 YARN 集群上。
- 不过 SQL Client 目前默认只支持嵌入式独立进程模式,不支持连接到远程 SQL 客户端网关的模式,后期会支持。
2,使用说明
(1)想要启动 SQL Client,需要使用 flink 的 bin 目录下的 sql-client.sh 脚本,这样可以开启一个交互式命令行。
(2)并且在启动的时候还可以通过 -i 参数指定一些 SQL 初始化脚本,在这个初始化脚本中一般定义的是一些 DDL 语句,这样在开启交互式命令行的时候直接就把表初始化好了,不需要多次执行命令创建表,这样比较方便。
(3)当然也可以提前把 DDL、DML 语句都整理好,相当于把建表语句、具体的计算逻辑都提前整理好,放到一个 SQL 文件中,然后通过 -f 参数去执行,这样就不会开启交互式命令行了,会直接按照文件中定义的 SQL 语句去执行了,这种方式类似于 hive -e 的效果。
二、基本用法
1,开启 Flink 集群
(1)通常来说,我们使用的是 Flink ON YARN 模式,所以在使用 Flink SQL 交互式命令行的时候建议也使用 ON YARN 模式。因此首先需要在 YARN 上开启一个 Flink 集群。
- 否则直接开启的 Flink SQL 交互式命令行是没法正常使用的,因为此时默认会连接本地的 Flink Standalone 集群,但是我们这个节点并不属于 Flink 的 Standalone 集群。
(2)我们使用 yarn-session.sh 脚本在 YARN 上开启一个 Flink 集群。
bin/yarn-session.sh -jm 1024m -tm 1024m -d
(3)当 yarn-session.sh 脚本执行成功之后,默认会在本地的临时文件中写入在 YARN 中创建的 Flink 集群的信息。我们可以查看一下这个临时文件:
ll /tmp/.yarn-properties-root more /tmp/.yarn-properties-root

2,启动交互式命令行
(1)我们执行如下命令启动 Flink SQL 交互式命令行的时候,它就会自动识别到前面创建的这个文件,并且连接到文件中指定的 Flink 集群中。
bin/sql-client.sh
(2)从启动日志中也可以看到,读取到了这个文件:/tmp/.yarn-properties-root
提示:那也就是说,sql-client 脚本在启动的时候默认会去找 /tmp/.yarn-properties-root 这个文件,如果服务器上有这个文件,则默认会根据文件中的信息连接 YARN 集群中的 Flink 集群,如果没有这个文件,默认会去连接当前机器上的 Standalone 集群。

3,执行 SQL 命令
(1)我们先执行下面 SQL 命令,这个 SQL 中的(VALUES ('Tom'), ('Jack'), ('Tom'), ('Jessic')) AS t1(name)表示定义了一个表,名为 t1,表中有一个 name 字段,表中有 4 条数据。
注意:在格式化 SQL 语句的时候不要使用制表符,使用制表符直接复制过来会有问题,需要使用空格才可以。
SELECT name, COUNT(*) AS cnt FROM (VALUES ('Tom'), ('Jack'), ('Tom'), ('Jessic')) AS t1(name) GROUP BY name;
(2)最终看到的结果是类似下面这样的,这个 SQL 默认会以流计算的形式去执行:

4,修改执行模式
(1)对于上面那个 SQL 而言,它就是一个离线计算,所以建议在执行之前修改执行模式,需要通过 set 命令修改:
注意:这个修改操作是在当前 Flink SQL 命令行会话中有效,重新进入就失效了。
set 'execution.runtime-mode' = 'batch';
(2)再次执行同样的 SQL,会发现输出如下内容:

三、配置信息设置
1,查看配置信息
(1)可以通过 set 命令查看当前命令行会话中的配置信息。
set;
(2)set 命令不区分大小写,都可以正常识别。上面命令执行后输出类似如下内容:

2,常用的配置参数
(1)通过开启 verbose 可以打印整个异常堆栈信息,相对以前只输出一句错误信息来说,更加容易追踪错误信息。
SET 'sql-client.verbose' = 'true';
- 下图可以看出开启开启前后的区别:

(2)设置批流作业模式,支持 batch 或者 streaming。
SET 'execution.runtime-mode' = 'batch';
(3)设置 Flink 任务名称。
SET 'pipeline.name' = 'my_job_name';

(4)Flink 任务的默认并行度为 1,可以通过下面命令进行调整。
SET 'parallelism.default' = '2';

(5)指定 SQL 任务启动时基于指定状态数据进行恢复。
SET 'execution.savepoint.path' = '/sp-001';
(6)对于有依赖的多个任务,通过这个配置去选择是同步执行还是异步执行,设置为 true 表示同步执行,false 表示异步执行。
- 针对离线任务,任务 A 执行完才能执行任务 B,此时应该设置为 true 同步执行有依赖关系的任务。
SET 'table.dml-sync' = 'true';
3,设置执行结果显示格式
(1)命令行显示的执行结果格式,默认是 table:
SET 'sql-client.execution.result-mode' = 'table';
-
结果格式如下:
(2)执行如下命令使用 changlog:
注意:只有流处理模式可以使用 changelog 这种格式,批处理模式使用会报错,因为批处理的数据不是动态的。
SET 'sql-client.execution.result-mode' = 'changelog';
- 此时结果格式如下,多了一行 op,显示了数据的变化情况,+I、-U、+U。因为此时是流处理模式,可以认为表中的数据是一条一条过来的,来一条数据计算一次,所以会存在更新的情况。

(3)执行如下命令使用 tableau:
SET 'sql-client.execution.result-mode' = 'tableau';
- 结果格式如下,如果数据源源不断产生,则会一直打印。

4,重置参数值
(1)如果想重置之前在会话中设置的参数,可以使用 RESET。当然,也可以支持退出会话,重新连进来也相当于重置了。
(2)reset 后面指定参数的时候表示重置这个参数的值为默认值。
reset 'parallelism.default';
(3)reset 后面不指定参数的时候表示重置这个会话中所有的参数。
reset;
四、使用 sql-client 执行 sql 文件
1,创建 sql 脚本
(1)首先我们创建一个 stu_script.sql 文件:
vi stu_script.sql
(2)在这个 sql 文件中定义一个输入表,输出表、以及具体的计算逻辑,内容如下:
-- 定义输入表 CREATE TABLE file_source( name STRING, age INT )WITH( 'connector' = 'filesystem', 'path' = 'hdfs://192.168.121.128:9000/student.json', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true' ); -- 定义输出表 CREATE TABLE file_sink( age INT, cnt BIGINT )WITH( 'connector' = 'filesystem', 'path' = 'hdfs://192.168.121.128:9000/student_out', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true' ); -- 指定批处理执行模式 SET 'execution.runtime-mode' = 'batch'; -- 指定任务名称 SET 'pipeline.name' = 'student_job'; -- 设置任务并行度 SET 'parallelism.default' = '2'; -- SQL计算逻辑 INSERT INTO file_sink SELECT age, COUNT(*) as cnt FROM file_source GROUP BY age;
2,准备测试数据
(1)首先我们创建一个 student.json 文件,文件内容如下:
[ {"name":"Hangge","age":19}, {"name":"Jack","age":16}, {"name":"Jessic","age":19}, {"name":"Mick","age":20}, {"name":"Joy","age":19}, {"name":"Say","age":20}, {"name":"Kahu","age":19}, {"name":"Hart","age":16} ]
- 其实, 文件内容写成如下形式也是可以的:
{"name":"Hangge","age":19} {"name":"Jack","age":16} {"name":"Jessic","age":19} {"name":"Mick","age":20} {"name":"Joy","age":19} {"name":"Say","age":20} {"name":"Kahu","age":19} {"name":"Hart","age":16}
(2)然后将该文件上传到 HDFS 上:
hdfs dfs -put student.json /
3,运行测试
(1)我们执行如下命令通过 sql-client 脚本去执行前面创建的 sql 文件:
bin/sql-client.sh -f stu_script.sql
(2)查看 HDFS 上产生的结果,说明任务执行成功。这就是使用 sql-client 执行 SQL 文件的流程,这样其实我们就只需要开发一个 sql 脚本文件即可,把需要的 DDL 语句、DML 语句、以及 SET 命令都定义到里面,直接提交触发一个任务的执行。
hdfs dfs -ls /student_out hdfs dfs -cat /student_out/part-*

全部评论(0)