Flink SQL - DDL语句使用详解2(读取Kafka创建表、计算输出至Kafka)
作者:hangge | 2025-04-17 08:44
数据源和目的地都是 Kafka 这种情况再实际工作中比较常见,可以实现数据的实时计算。下面通过样例进行演示。
一、Kafka(Source) + Kafka(Sink)案例
1,准备工作
首先我们需要在项目的 pom.xml 中添加 Flink SQL 相关的依赖,以及 flink-connector-kafka、flink-json 依赖。
提示:Kafka 这个 Connector 需要使用 flink-connector-kafka 这个依赖,在操作 json 格式数据的时候需要用到 flink-json 这个依赖。
<!-- Flink clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.16.0</version>
</dependency>
<!-- Flink SQL -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>1.16.0</version>
</dependency>
<!-- flink-json -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.16.0</version>
</dependency>
<!-- flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.16.0</version>
</dependency>
2,样例代码
(1)下面是 Scala 语言代码,具体逻辑为:
- 使用 SQL 定义了一个名为 kafka_source 的输入表,绑定到 Kafka 的主题 dt001。
- 使用 SQL 定义了一个名为 kafka_sink 的输出表,绑定到 Kafka 的主题 dt002。
- 从 kafka_source 表中读取数据。 过滤条件:仅保留 age > 10 的记录。 将符合条件的记录插入到 kafka_sink 表。
注意:在 FlinkSQL 中创建表的时候,需要注意,表名区分大小写。
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
object KafkaSourceSinkSQL {
def main(args: Array[String]): Unit = {
//创建执行环境
val settings = EnvironmentSettings
.newInstance()
//指定执行模式,支持inBatchMode和inStreamingMode
.inStreamingMode()
.build()
val tEnv = TableEnvironment.create(settings)
//创建输入表
val inTableSql =
"""
|CREATE TABLE kafka_source(
| name STRING,
| age INT
|)WITH(
| 'connector' = 'kafka',
| 'topic' = 'dt001',
| 'properties.bootstrap.servers' = '192.168.121.128:9092',
| 'properties.group.id' = 'gid-sql-1',
| 'scan.startup.mode' = 'group-offsets',
| 'properties.auto.offset.reset' = 'latest',
| 'format' = 'json',
| 'json.fail-on-missing-field' = 'false',
| 'json.ignore-parse-errors' = 'true'
|)
|""".stripMargin
tEnv.executeSql(inTableSql)
//创建输出表
val outTableSql =
"""
|CREATE TABLE kafka_sink(
| name STRING,
| age INT
|)WITH(
| 'connector' = 'kafka',
| 'topic' = 'dt002',
| 'properties.bootstrap.servers' = '192.168.121.128:9092',
| 'format' = 'json',
| 'sink.partitioner' = 'default'
|)
|""".stripMargin
tEnv.executeSql(outTableSql)
//业务逻辑
val execSql_append =
"""
|INSERT INTO kafka_sink
|SELECT
| name,
| age
|FROM kafka_source
|WHERE age > 10
|""".stripMargin
tEnv.executeSql(execSql_append)
}
}
(2)下面是实现同样功能的 Java 语言代码:
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
public class KafkaSourceSinkSQLJava {
public static void main(String[] args) {
// 创建执行环境
EnvironmentSettings settings = EnvironmentSettings.newInstance()
// 指定执行模式,支持 inBatchMode 和 inStreamingMode
.inStreamingMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
// 创建输入表
String inTableSql = "CREATE TABLE kafka_source (" +
" name STRING," +
" age INT" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'dt001'," +
" 'properties.bootstrap.servers' = '192.168.121.128:9092'," +
" 'properties.group.id' = 'gid-sql-1'," +
" 'scan.startup.mode' = 'group-offsets'," +
" 'properties.auto.offset.reset' = 'latest'," +
" 'format' = 'json'," +
" 'json.fail-on-missing-field' = 'false'," +
" 'json.ignore-parse-errors' = 'true'" +
")";
tEnv.executeSql(inTableSql);
// 创建输出表
String outTableSql = "CREATE TABLE kafka_sink (" +
" name STRING," +
" age INT" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'dt002'," +
" 'properties.bootstrap.servers' = '192.168.121.128:9092'," +
" 'format' = 'json'," +
" 'sink.partitioner' = 'default'" +
")";
tEnv.executeSql(outTableSql);
// 业务逻辑
String execSqlAppend = "INSERT INTO kafka_sink " +
"SELECT " +
" name, " +
" age " +
"FROM kafka_source " +
"WHERE age > 10";
tEnv.executeSql(execSqlAppend);
}
}
3,运行测试
(1)首先我们创建 dt001、dt002 这两个 topic:
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic dt001 kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic dt002
(2)然后我们开启一个基于控制台的生产者:
kafka-console-producer.sh --broker-list localhost:9092 --topic dt001
(3)接着我们开启一个基于控制台的消费者:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dt002
(4)启动程序,然后我们在生产者中依次模拟产生如下三条数据:
{"name":"hangge","age":19}
{"name":"小李","age":10}
{"name":"老王","age":99}
(5)然后在消费者这边可以看到消费的数据如下。这样就说明这个基于 Kafka 的 FlinkSQL 任务正常执行了。

附:删除表
1,什么情况下需要删除表?
(1)针对本文的样例场景其实不需要删除表,因为这个表是临时的,只在当前 Flink 任务内部有效,任务执行结果后,我们定义的表就没有了,下次想要使用的话需要重启任务,这样就会自动重新创建表。
(2)如果我们能够将表的元数据信息,也就是表的 schema 信息持久化存储起来,那么删除表就有意义了
2,如何删除表?
如果在 Flink 任务中需要去删除某个表,可以使用 drop table 语句。
tEnv.executeSql("DROP TABLE kafka_sink")
全部评论(0)