Flink SQL - DDL语句使用详解1(读取hdfs文件创建表、计算输出至控制台)
作者:hangge | 2025-04-16 09:03
DDL 语句主要涉及到创建表、修改表和删除表、但是针对 Flink SQL 而言,在工作中主要涉及的操作就是创建表。下面我将通过案例来演示一下如何使用 Flink SQL 语句创建表。
一、FileSystem(Source) + Print(Sink)案例
1,准备工作
首先我们需要在项目的 pom.xml 中添加 Flink SQL 相关的依赖,以及 file-connector、flink-json 和 hadoop 的依赖。
提示:因为这个案例中需要使用 Flink SQL 操作文件,需要用到 file-connector 和 hadoop 的依赖,并且文件中的数据格式是 json 格式的,还需要用到 flink-json 的依赖。
<!-- Flink clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>1.16.0</version> </dependency> <!-- Flink SQL --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>1.16.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.12</artifactId> <version>1.16.0</version> </dependency> <!-- filesystem-connector --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-files</artifactId> <version>1.16.0</version> </dependency> <!-- flink-json --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>1.16.0</version> </dependency> <!-- hadoop依赖 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.2.0</version> </dependency>
2,样例代码
(1)下面是 Scala 语言代码,具体逻辑为:
- 创建表 file_source,使用 filesystem 连接器,表示数据来源是文件系统。
- 创建表 print_sink,使用 print 连接器,表述输出数据到标准输出流(控制台)。
- 从 file_source 表读取数据,按 age 字段分组。统计每个 age 的记录数量,结果列为 cnt。查询结果插入到 print_sink 表,最终打印到控制台。
注意:在 FlinkSQL 中创建表的时候,需要注意,表名区分大小写。
import org.apache.flink.table.api._ object FileSystemPrintSQL { def main(args: Array[String]): Unit = { //创建执行环境 val settings = EnvironmentSettings .newInstance() //指定执行模式,支持inBatchMode和inStreamingMode .inBatchMode() .build() val tEnv = TableEnvironment.create(settings) //创建输入表 val inTableSql = """ |CREATE TABLE file_source( | name STRING, | age INT |)WITH( | 'connector' = 'filesystem', | 'path' = 'hdfs://192.168.121.128:9000/student.json', | 'format' = 'json', | 'json.fail-on-missing-field' = 'false', | 'json.ignore-parse-errors' = 'true' |) |""".stripMargin tEnv.executeSql(inTableSql) //创建输出表 val outTableSql = """ |CREATE TABLE print_sink( | age INT, | cnt BIGINT |)WITH( | 'connector' = 'print' |) |""".stripMargin tEnv.executeSql(outTableSql) //业务逻辑 val execSql = """ |INSERT INTO print_sink |SELECT | age, | COUNT(*) AS cnt |FROM file_source |GROUP BY age |""".stripMargin tEnv.executeSql(execSql) } }
(2)下面是实现同样功能的 Java 语言代码:
import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; public class FileSystemPrintSQLJava { public static void main(String[] args) { // 创建执行环境 EnvironmentSettings settings = EnvironmentSettings .newInstance() // 指定执行模式为批处理模式 .inBatchMode() .build(); TableEnvironment tEnv = TableEnvironment.create(settings); // 创建输入表 String inTableSql = "CREATE TABLE file_source( " + " name STRING, " + " age INT " + ")WITH( " + " 'connector' = 'filesystem', " + " 'path' = 'hdfs://192.168.121.128:9000/student.json', " + " 'format' = 'json', " + " 'json.fail-on-missing-field' = 'false', " + " 'json.ignore-parse-errors' = 'true' " + ")"; tEnv.executeSql(inTableSql); // 创建输出表 String outTableSql = "CREATE TABLE print_sink( " + " age INT, " + " cnt BIGINT " + ")WITH( " + " 'connector' = 'print' " + ")"; tEnv.executeSql(outTableSql); // 业务逻辑 String execSql = "INSERT INTO print_sink " + "SELECT " + " age, " + " COUNT(*) AS cnt " + "FROM file_source " + "GROUP BY age"; tEnv.executeSql(execSql); } }
3,运行测试
(1)首先我们在 HDFS 中添加一个 student.json 文件,文件内容如下:
[ {"name":"Hangge","age":19}, {"name":"Jack","age":16}, {"name":"Jessic","age":19}, {"name":"Mick","age":20}, {"name":"Joy","age":19}, {"name":"Say","age":20}, {"name":"Kahu","age":19}, {"name":"Hart","age":16} ]
- 其实, 文件内容写成如下形式也是可以的:
{"name":"Hangge","age":19} {"name":"Jack","age":16} {"name":"Jessic","age":19} {"name":"Mick","age":20} {"name":"Joy","age":19} {"name":"Say","age":20} {"name":"Kahu","age":19} {"name":"Hart","age":16}
(2)执行程序,由于我们代码中设定的是 Batch 批处理模式,则直接返回最终的结果。可以看到控制台输出内容如下:

(3)如果是 Steaming 流处理模式,会一条一条处理,这样可以看到每次计算的动态结果。我们把代码中的执行模式修改为 inStreamingMode():
//创建执行环境 val settings = EnvironmentSettings .newInstance() //指定执行模式,支持inBatchMode和inStreamingMode .inStreamingMode() .build() val tEnv = TableEnvironment.create(settings)
(4)再次执行程序,可以看到控制台输出内容如下。此时可以看到数据的变化情况,最终结果和批处理的结果是一样的。

附:删除表
1,什么情况下需要删除表?
(1)针对本文的样例场景其实不需要删除表,因为这个表是临时的,只在当前 Flink 任务内部有效,任务执行结果后,我们定义的表就没有了,下次想要使用的话需要重启任务,这样就会自动重新创建表。
(2)如果我们能够将表的元数据信息,也就是表的 schema 信息持久化存储起来,那么删除表就有意义了
2,如何删除表?
如果在 Flink 任务中需要去删除某个表,可以使用 drop table 语句。
tEnv.executeSql("DROP TABLE print_sink")
全部评论(0)