返回 导航

大数据

hangge.com

Flink SQL - Flink SQL Client客户端工具使用详解

作者:hangge | 2025-04-27 08:38
    在之前的文章中,我们通过 Flink SQL 的形式查询 Flink 中的实时数据,虽然核心的业务逻辑是通过 SQL 实现的,但是这个 SQL 是需要嵌入到 Flink 程序中的,多少还是要写一些 Java 或者 Scala 代码的,并且最后提交执行的时候还是需要对代码编译打包的,这个过程是比较繁琐的,用起来不太方便,特别是在代码开发调试阶段。
    其实 Flink SQL 也提供一个类似于 Hive 客户端的功能,那便是 Flink SQL Client 这个客户端工具。本文将对其进行详细介绍。

一、基本介绍

1,什么是Flink SQL Clinet?

(1)Flink 中的 SQL Clinet 客户端工具提供了一种简单的方法来编写、调试和提交 Flink 程序,不需要一行 Java 或者 Scala 代码。

(2)Flink SQL Clinet 客户端工具允许在命令行上从正在运行的 Flink 程序中计算和查询数据。 
  • 就类似于使用 Hive 中提供的 hive 客户端或者 beeline 客户端命令行一样,直接编写 SQL,提交 SQL 任务去执行,并且获取 SQL 的执行结果。

(3)SQL Client 目前可以运行在 FlinkStandalone 集群和 YARN 集群上。
  • 不过 SQL Client 目前默认只支持嵌入式独立进程模式,不支持连接到远程 SQL 客户端网关的模式,后期会支持。

2,使用说明

(1)想要启动 SQL Client,需要使用 flinkbin 目录下的 sql-client.sh 脚本,这样可以开启一个交互式命令行。

(2)并且在启动的时候还可以通过 -i 参数指定一些 SQL 初始化脚本,在这个初始化脚本中一般定义的是一些 DDL 语句,这样在开启交互式命令行的时候直接就把表初始化好了,不需要多次执行命令创建表,这样比较方便。

(3)当然也可以提前把 DDLDML 语句都整理好,相当于把建表语句、具体的计算逻辑都提前整理好,放到一个 SQL 文件中,然后通过 -f 参数去执行,这样就不会开启交互式命令行了,会直接按照文件中定义的 SQL 语句去执行了,这种方式类似于 hive -e 的效果。

二、基本用法

1,开启 Flink 集群

(1)通常来说,我们使用的是 Flink ON YARN 模式,所以在使用 Flink SQL 交互式命令行的时候建议也使用 ON YARN 模式。因此首先需要在 YARN 上开启一个 Flink 集群。
  • 否则直接开启的 Flink SQL 交互式命令行是没法正常使用的,因为此时默认会连接本地的 Flink Standalone 集群,但是我们这个节点并不属于 FlinkStandalone 集群。

(2)我们使用 yarn-session.sh 脚本在 YARN 上开启一个 Flink 集群。
bin/yarn-session.sh -jm 1024m -tm 1024m -d

(3)当 yarn-session.sh 脚本执行成功之后,默认会在本地的临时文件中写入在 YARN 中创建的 Flink 集群的信息。我们可以查看一下这个临时文件:
ll /tmp/.yarn-properties-root
more /tmp/.yarn-properties-root

2,启动交互式命令行

(1)我们执行如下命令启动 Flink SQL 交互式命令行的时候,它就会自动识别到前面创建的这个文件,并且连接到文件中指定的 Flink 集群中。
bin/sql-client.sh

(2)从启动日志中也可以看到,读取到了这个文件:/tmp/.yarn-properties-root
提示:那也就是说,sql-client 脚本在启动的时候默认会去找 /tmp/.yarn-properties-root 这个文件,如果服务器上有这个文件,则默认会根据文件中的信息连接 YARN 集群中的 Flink 集群,如果没有这个文件,默认会去连接当前机器上的 Standalone 集群。

3,执行 SQL 命令

(1)我们先执行下面 SQL 命令,这个 SQL 中的(VALUES ('Tom'), ('Jack'), ('Tom'), ('Jessic')) AS t1(name)表示定义了一个表,名为 t1,表中有一个 name 字段,表中有 4 条数据。
注意:在格式化 SQL 语句的时候不要使用制表符,使用制表符直接复制过来会有问题,需要使用空格才可以。
SELECT
  name,
  COUNT(*) AS cnt
FROM
  (VALUES ('Tom'), ('Jack'), ('Tom'), ('Jessic')) AS t1(name)
GROUP BY name;

(2)最终看到的结果是类似下面这样的,这个 SQL 默认会以流计算的形式去执行:

4,修改执行模式

(1)对于上面那个 SQL 而言,它就是一个离线计算,所以建议在执行之前修改执行模式,需要通过 set 命令修改:
注意:这个修改操作是在当前 Flink SQL 命令行会话中有效,重新进入就失效了。
set 'execution.runtime-mode' = 'batch';

(2)再次执行同样的 SQL,会发现输出如下内容:

三、配置信息设置

1,查看配置信息

(1)可以通过 set 命令查看当前命令行会话中的配置信息。
set;

(2)set 命令不区分大小写,都可以正常识别。上面命令执行后输出类似如下内容:

2,常用的配置参数

(1)通过开启 verbose 可以打印整个异常堆栈信息,相对以前只输出一句错误信息来说,更加容易追踪错误信息。
SET 'sql-client.verbose' = 'true';

(2)设置批流作业模式,支持 batch 或者 streaming
SET 'execution.runtime-mode' = 'batch';

(3)设置 Flink 任务名称。
SET 'pipeline.name' = 'my_job_name';

(4)Flink 任务的默认并行度为 1,可以通过下面命令进行调整。
SET 'parallelism.default' = '2';

(5)指定 SQL 任务启动时基于指定状态数据进行恢复。
SET 'execution.savepoint.path' = '/sp-001';

(6)对于有依赖的多个任务,通过这个配置去选择是同步执行还是异步执行,设置为 true 表示同步执行,false 表示异步执行。
  • 针对离线任务,任务 A 执行完才能执行任务 B,此时应该设置为 true 同步执行有依赖关系的任务。
SET 'table.dml-sync' = 'true';

3,设置执行结果显示格式

(1)命令行显示的执行结果格式,默认是 table
SET 'sql-client.execution.result-mode' = 'table';
(2)执行如下命令使用 changlog
注意:只有流处理模式可以使用 changelog 这种格式,批处理模式使用会报错,因为批处理的数据不是动态的。
SET 'sql-client.execution.result-mode' = 'changelog';

(3)执行如下命令使用 tableau
SET 'sql-client.execution.result-mode' = 'tableau';

4,重置参数值

(1)如果想重置之前在会话中设置的参数,可以使用 RESET。当然,也可以支持退出会话,重新连进来也相当于重置了。

(2)reset 后面指定参数的时候表示重置这个参数的值为默认值。
reset 'parallelism.default';

(3)reset 后面不指定参数的时候表示重置这个会话中所有的参数。
reset;

四、使用 sql-client 执行 sql 文件

1,创建 sql 脚本

(1)首先我们创建一个 stu_script.sql 文件:
vi stu_script.sql

(2)在这个 sql 文件中定义一个输入表,输出表、以及具体的计算逻辑,内容如下:
-- 定义输入表
CREATE TABLE file_source(
  name STRING,
  age INT
)WITH(
  'connector' = 'filesystem', 
  'path' = 'hdfs://192.168.121.128:9000/student.json',
  'format' = 'json',
  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true'
);

-- 定义输出表
CREATE TABLE file_sink(
  age INT,
  cnt BIGINT
)WITH(
  'connector' = 'filesystem', 
  'path' = 'hdfs://192.168.121.128:9000/student_out',
  'format' = 'json',
  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true'
);

-- 指定批处理执行模式
SET 'execution.runtime-mode' = 'batch';

-- 指定任务名称
SET 'pipeline.name' = 'student_job';

-- 设置任务并行度
SET 'parallelism.default' = '2';

-- SQL计算逻辑
INSERT INTO file_sink 
SELECT 
  age,
  COUNT(*) as cnt
FROM file_source
GROUP BY age;

2,准备测试数据

(1)首先我们创建一个 student.json 文件,文件内容如下:
[
  {"name":"Hangge","age":19},
  {"name":"Jack","age":16},
  {"name":"Jessic","age":19},
  {"name":"Mick","age":20},
  {"name":"Joy","age":19},
  {"name":"Say","age":20},
  {"name":"Kahu","age":19},
  {"name":"Hart","age":16}
]
{"name":"Hangge","age":19}
{"name":"Jack","age":16}
{"name":"Jessic","age":19}
{"name":"Mick","age":20}
{"name":"Joy","age":19}
{"name":"Say","age":20}
{"name":"Kahu","age":19}
{"name":"Hart","age":16}

(2)然后将该文件上传到 HDFS 上:
hdfs dfs -put student.json /

3,运行测试

(1)我们执行如下命令通过 sql-client 脚本去执行前面创建的 sql 文件:
bin/sql-client.sh -f stu_script.sql 

(2)查看 HDFS 上产生的结果,说明任务执行成功。这就是使用 sql-client 执行 SQL 文件的流程,这样其实我们就只需要开发一个 sql 脚本文件即可,把需要的 DDL 语句、DML 语句、以及 SET 命令都定义到里面,直接提交触发一个任务的执行。
hdfs dfs -ls /student_out
hdfs dfs -cat /student_out/part-*
评论

全部评论(0)

回到顶部