Flink SQL - DDL语句使用详解1(读取hdfs文件创建表、计算输出至控制台)
作者:hangge | 2025-04-16 09:03
DDL 语句主要涉及到创建表、修改表和删除表、但是针对 Flink SQL 而言,在工作中主要涉及的操作就是创建表。下面我将通过案例来演示一下如何使用 Flink SQL 语句创建表。
一、FileSystem(Source) + Print(Sink)案例
1,准备工作
首先我们需要在项目的 pom.xml 中添加 Flink SQL 相关的依赖,以及 file-connector、flink-json 和 hadoop 的依赖。
提示:因为这个案例中需要使用 Flink SQL 操作文件,需要用到 file-connector 和 hadoop 的依赖,并且文件中的数据格式是 json 格式的,还需要用到 flink-json 的依赖。
<!-- Flink clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.16.0</version>
</dependency>
<!-- Flink SQL -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>1.16.0</version>
</dependency>
<!-- filesystem-connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>1.16.0</version>
</dependency>
<!-- flink-json -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.16.0</version>
</dependency>
<!-- hadoop依赖 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.0</version>
</dependency>
2,样例代码
(1)下面是 Scala 语言代码,具体逻辑为:
- 创建表 file_source,使用 filesystem 连接器,表示数据来源是文件系统。
- 创建表 print_sink,使用 print 连接器,表述输出数据到标准输出流(控制台)。
- 从 file_source 表读取数据,按 age 字段分组。统计每个 age 的记录数量,结果列为 cnt。查询结果插入到 print_sink 表,最终打印到控制台。
注意:在 FlinkSQL 中创建表的时候,需要注意,表名区分大小写。
import org.apache.flink.table.api._
object FileSystemPrintSQL {
def main(args: Array[String]): Unit = {
//创建执行环境
val settings = EnvironmentSettings
.newInstance()
//指定执行模式,支持inBatchMode和inStreamingMode
.inBatchMode()
.build()
val tEnv = TableEnvironment.create(settings)
//创建输入表
val inTableSql =
"""
|CREATE TABLE file_source(
| name STRING,
| age INT
|)WITH(
| 'connector' = 'filesystem',
| 'path' = 'hdfs://192.168.121.128:9000/student.json',
| 'format' = 'json',
| 'json.fail-on-missing-field' = 'false',
| 'json.ignore-parse-errors' = 'true'
|)
|""".stripMargin
tEnv.executeSql(inTableSql)
//创建输出表
val outTableSql =
"""
|CREATE TABLE print_sink(
| age INT,
| cnt BIGINT
|)WITH(
| 'connector' = 'print'
|)
|""".stripMargin
tEnv.executeSql(outTableSql)
//业务逻辑
val execSql =
"""
|INSERT INTO print_sink
|SELECT
| age,
| COUNT(*) AS cnt
|FROM file_source
|GROUP BY age
|""".stripMargin
tEnv.executeSql(execSql)
}
}
(2)下面是实现同样功能的 Java 语言代码:
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
public class FileSystemPrintSQLJava {
public static void main(String[] args) {
// 创建执行环境
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
// 指定执行模式为批处理模式
.inBatchMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
// 创建输入表
String inTableSql =
"CREATE TABLE file_source( " +
" name STRING, " +
" age INT " +
")WITH( " +
" 'connector' = 'filesystem', " +
" 'path' = 'hdfs://192.168.121.128:9000/student.json', " +
" 'format' = 'json', " +
" 'json.fail-on-missing-field' = 'false', " +
" 'json.ignore-parse-errors' = 'true' " +
")";
tEnv.executeSql(inTableSql);
// 创建输出表
String outTableSql =
"CREATE TABLE print_sink( " +
" age INT, " +
" cnt BIGINT " +
")WITH( " +
" 'connector' = 'print' " +
")";
tEnv.executeSql(outTableSql);
// 业务逻辑
String execSql =
"INSERT INTO print_sink " +
"SELECT " +
" age, " +
" COUNT(*) AS cnt " +
"FROM file_source " +
"GROUP BY age";
tEnv.executeSql(execSql);
}
}
3,运行测试
(1)首先我们在 HDFS 中添加一个 student.json 文件,文件内容如下:
[
{"name":"Hangge","age":19},
{"name":"Jack","age":16},
{"name":"Jessic","age":19},
{"name":"Mick","age":20},
{"name":"Joy","age":19},
{"name":"Say","age":20},
{"name":"Kahu","age":19},
{"name":"Hart","age":16}
]
- 其实, 文件内容写成如下形式也是可以的:
{"name":"Hangge","age":19}
{"name":"Jack","age":16}
{"name":"Jessic","age":19}
{"name":"Mick","age":20}
{"name":"Joy","age":19}
{"name":"Say","age":20}
{"name":"Kahu","age":19}
{"name":"Hart","age":16}
(2)执行程序,由于我们代码中设定的是 Batch 批处理模式,则直接返回最终的结果。可以看到控制台输出内容如下:

(3)如果是 Steaming 流处理模式,会一条一条处理,这样可以看到每次计算的动态结果。我们把代码中的执行模式修改为 inStreamingMode():
//创建执行环境 val settings = EnvironmentSettings .newInstance() //指定执行模式,支持inBatchMode和inStreamingMode .inStreamingMode() .build() val tEnv = TableEnvironment.create(settings)
(4)再次执行程序,可以看到控制台输出内容如下。此时可以看到数据的变化情况,最终结果和批处理的结果是一样的。

附:删除表
1,什么情况下需要删除表?
(1)针对本文的样例场景其实不需要删除表,因为这个表是临时的,只在当前 Flink 任务内部有效,任务执行结果后,我们定义的表就没有了,下次想要使用的话需要重启任务,这样就会自动重新创建表。
(2)如果我们能够将表的元数据信息,也就是表的 schema 信息持久化存储起来,那么删除表就有意义了
2,如何删除表?
如果在 Flink 任务中需要去删除某个表,可以使用 drop table 语句。
tEnv.executeSql("DROP TABLE print_sink")
全部评论(0)