Flink SQL - 动态表转换为输出流教程详解(Append-only流、Retract流、Upsert流)
作者:hangge | 2025-04-19 09:37
当我们想要把动态输出表中的数据转换为输出数据流,或者将其输出到外部存储系统中的时候,需要对这个动态输出表中的数据行为进行编码。
Flink SQL 支持三种编码方式来体现一个动态表的变化,这三种编码方式对应的是三种数据流:
- Append-only 流,中文翻译为:仅追加流,也就是说只有新增行为。
- Retract 流,中文翻译为:回撤流、或者撤回流,都是一个意思,他里面包括新增、更新和删除行为。
- Upsert 流,中文翻译为:插入流,也包括新增、更新和删除行为。
本文我将通过样例详细分析一下这几种数据流。
一、Append-only 流
1,基本介绍
(1)Append-only 这种流是仅追加的,表示输出流中只有 INSERT 类型的操作。
(2)我们前面分析的基于时间窗口的聚合操作产生的数据其实对应的就是 Append-only 这种数据流(点击查看),或者是不涉及分组聚合的需求,普通的实时 ETL 类型的 Flink SQL 也只会产生 Append-only 这种数据流。
2,样例代码
(1)下面是一个 Append-only 流样例,使用的是 Scala 语言。
- 为了便于观察数据变化,所以把任务的并行度设置为了 1。
- Source 我们使用了 DataGen,这个组件可以自动随机产生数据,相当于是一个无界的数据流,测试的时候使用比较方便。
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.Schema import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.table.connector.ChangelogMode object AppendOnlySQL { def main(args: Array[String]): Unit = { //由于需要将Table转为DataStream,所以需要使用StreamTableEnvironment //创建执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) //设置全局默认并行度 env.setParallelism(1) //创建输入表 val inTableSql = """ |CREATE TABLE orders ( | order_id BIGINT, | price DECIMAL(10,2), | order_time TIMESTAMP |) WITH ( | 'connector' = 'datagen', | 'rows-per-second' = '1' |) |""".stripMargin tEnv.executeSql(inTableSql) //执行SQL查询操作 val resTable = tEnv.sqlQuery("SELECT * FROM orders") //将结果转换为DataStream数据流 val resStream = tEnv.toChangelogStream(resTable, Schema.newBuilder().build(), ChangelogMode.insertOnly() ) //打印DataStream数据流中的数据 resStream.print() //执行 env.execute("AppendOnlySQL") } }
(2)下面是使用 Java 语言实现同样功能:
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.types.Row; public class AppendOnlySQLJava { public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // 设置全局默认并行度 env.setParallelism(1); // 创建输入表 String inTableSql = "CREATE TABLE orders (\n" + " order_id BIGINT,\n" + " price DECIMAL(10,2),\n" + " order_time TIMESTAMP\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' = '1'\n" + ")"; tEnv.executeSql(inTableSql); // 执行 SQL 查询操作 Table resTable = tEnv.sqlQuery("SELECT * FROM orders"); // 将结果转换为 DataStream 数据流 DataStream<Row> resStream = tEnv.toChangelogStream(resTable, Schema.newBuilder().build(), ChangelogMode.insertOnly() ); // 打印 DataStream 数据流中的数据 resStream.print(); // 执行 env.execute("AppendOnlySQL"); } }
3,运行测试
(1)执行代码,可以看到控制台输出内容如下:

(2)此时结果中的数据前面都有一个标识 +I,表示是 INSERT 的意思。这就是 Append-only 数据流的使用场景,只有新增。
二、Retract 流
1,基本介绍
(1)Retract 流也可以称之为回撤流,或者是撤回流,都是一个意思。
- Retract 流里面会包含新增(INSERT)、更新(UPDATE)和删除(DELETE)。
- 针对更新这种行为,对应的是 UPDATE_BEFORE(-U 回撤老数据) 和 UPDATE_AFTER(+U 添加新数据)。
(2)我们可以通过下图加深一下对 Retract 流的理解。这里的 Flink SQL 中实现了分组聚合的计算逻辑,根据 name 字段进行分组聚合,只要后续新增数据的 name 字段相同,就会更新之前的结果数据,只不过他的更新不是直接更新,而是先回撤之前的老数据,再添加新的数据。
- 针对 name=Tom 这条数据,第一次来的时候会产生一条结果数据:+I [Tom,1]
- 当 name=Tom 这条数据第二次来的时候,会产生两条结果数据:-U [Tom,1] 和 +U [Tom,2],这样表示回撤之前的老数据 Tom,1,再添加新的数据 Tom,2。
- 如果下游还有任务去消费这些数据流的话,需要正确处理 +I, -U 和 +U 这几种类型的数据,防止数据计算重复出现错误。

2,样例代码
(1)下面是一个使用 Scala 语言编写的 Retract 流样例:
- 由于 Flink SQL 中使用了 GROUP BY 操作,为了保证结果有意义,所以需要保证数据源中会产生相同 order_id 的数据,这样才会触发更新操作。
- 所以在 datagen 这个 Source 组件中定义了 order_id 这个字段的取值范围(100~105),这样 datagen 在生成数据的时候就会按照这个取值范围生成 order_id,这样数据原始数据就会出现重复的了,分组操作就有意义了。
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.table.api.{EnvironmentSettings, Schema, TableEnvironment} import org.apache.flink.table.connector.ChangelogMode object RetractSQL { def main(args: Array[String]): Unit = { //由于需要将Table转为DataStream,所以需要使用StreamTableEnvironment //创建执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) //设置全局默认并行度 env.setParallelism(1) //创建输入表 val inTableSql = """ |CREATE TABLE orders ( | order_id BIGINT, | price DECIMAL(10,2), | order_time TIMESTAMP |) WITH ( | 'connector' = 'datagen', | 'rows-per-second' = '1', | 'fields.order_id.min' = '100', | 'fields.order_id.max' = '105' |) |""".stripMargin tEnv.executeSql(inTableSql) //执行SQL查询操作 val resTable = tEnv.sqlQuery("SELECT order_id, COUNT(*) AS cnt FROM orders GROUP BY order_id") //将结果转换为DataStream数据流 val resStream = tEnv.toChangelogStream(resTable, Schema.newBuilder().build(), ChangelogMode.all() ) //打印DataStream数据流中的数据 resStream.print() //执行 env.execute("RetractSQL") } }
(2)下面是使用 Java 语言实现同样的功能:
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.types.Row; public class RetractSQLJava { public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // 设置全局默认并行度 env.setParallelism(1); // 创建输入表 String inTableSql = "CREATE TABLE orders (\n" + " order_id BIGINT,\n" + " price DECIMAL(10,2),\n" + " order_time TIMESTAMP\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' = '1',\n" + " 'fields.order_id.min' = '100',\n" + " 'fields.order_id.max' = '105'\n" + ")"; tEnv.executeSql(inTableSql); // 执行 SQL 查询操作 Table resTable = tEnv.sqlQuery("SELECT order_id, " + "COUNT(*) AS cnt FROM orders GROUP BY order_id"); // 将结果转换为 DataStream 数据流 DataStream<Row> resStream = tEnv.toChangelogStream(resTable, Schema.newBuilder().build(), ChangelogMode.all() ); // 打印 DataStream 数据流中的数据 resStream.print(); // 执行 env.execute("RetractSQL"); } }
3,运行测试
(1)执行代码,控制台输出内容如下。此时结果中的数据前面会出现标识 +I、-U 和 +U。
- +I 的意思是 INSERT。
- -U 的意思是 UPDATE_BEFORE
- +U 的意思是 UPDATE_AFTER

(2)针对新增数据而言,会产生一条 +I 标识的数据,当触发更新操作的时候会产生两条数据:-U 和 +U,因为更新操作会先回撤之前的老数据,再添加新数据。
三、Upsert 流
1,基本介绍
(1)Upsert 流和 Retract 流有点类似,想要把动态表转换为 Upsert 流的前提是动态表中需要指定主键,也就是我们平时所说的唯一键,唯一键可以由 1 个或者多个字段组合而成。
- Upsert 流里面会包含新增(INSERT)、更新(UPDATE)和删除(DELETE)。
- 新增、删除和前面的没什么区别,主要区别在于更新。
- 更新:UPDATE,对应的是 UPDATE_AFTER(+U),没有 UPDATE_BEFORE。
- Upsert 流中的更新操作对应的是一条数据记录,而 Retract 流中的更新操作对应的是两条数据记录。
- 所以在更新数据的时候,转换为 Upsert 流的效率要高于 Retract 流。
(2)我们可以通过下图来加深对 Upsert 流的理解。这里的 Flink SQL 中同样也是实现了分组聚合的计算逻辑。
- 针对 name=Tom 这条数据,第一次来的时候会产生一条结果数据:+I [Tom,1]
- 当 name=Tom 这条数据第二次来的时候,只会产生 1 条结果数据:+U [Tom,2],相当于直接对之前的数据进行了更新,没有涉及到回撤数据的操作。

2,样例代码
(1)下面是一个使用 Scala 语言编写的 Upsert 流样例:
注意:在 Schema 中将 order_id 设置为主键之后,需要在数据源中将 order_id 声明为 NOT NULL,否则会报错,因为主键是非空的,所以需要定义。
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.Schema import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.table.connector.ChangelogMode object UpsertSQL { def main(args: Array[String]): Unit = { //由于需要将Table转为DataStream,所以需要使用StreamTableEnvironment //创建执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) //设置全局默认并行度 env.setParallelism(1) //创建输入表 val inTableSql = """ |CREATE TABLE orders ( | order_id BIGINT NOT NULL, | price DECIMAL(10,2), | order_time TIMESTAMP |) WITH ( | 'connector' = 'datagen', | 'rows-per-second' = '1', | 'fields.order_id.min' = '100', | 'fields.order_id.max' = '105' |) |""".stripMargin tEnv.executeSql(inTableSql) //执行SQL查询操作 val resTable = tEnv.sqlQuery("SELECT order_id, COUNT(*) AS cnt FROM orders GROUP BY order_id") //将结果转换为DataStream数据流 val resStream = tEnv.toChangelogStream(resTable, Schema.newBuilder().primaryKey("order_id").build(), ChangelogMode.upsert() ) //打印DataStream数据流中的数据 resStream.print() //执行 env.execute("UpsertSQL") } }
(2)下面是使用 Java 语言实现同样功能:
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.types.Row; public class UpsertSQLJava { public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // 设置全局默认并行度 env.setParallelism(1); // 创建输入表 String inTableSql = "CREATE TABLE orders (\n" + " order_id BIGINT NOT NULL,\n" + " price DECIMAL(10,2),\n" + " order_time TIMESTAMP\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' = '1',\n" + " 'fields.order_id.min' = '100',\n" + " 'fields.order_id.max' = '105'\n" + ")"; tEnv.executeSql(inTableSql); // 执行 SQL 查询操作 Table resTable = tEnv.sqlQuery("SELECT order_id, " + "COUNT(*) AS cnt FROM orders GROUP BY order_id"); // 将结果转换为 DataStream 数据流 DataStream<Row> resStream = tEnv.toChangelogStream( resTable, Schema.newBuilder().primaryKey("order_id").build(), ChangelogMode.upsert() ); // 打印 DataStream 数据流中的数据 resStream.print(); // 执行 env.execute("UpsertSQL"); } }
3,运行测试
执行代码,控制台输出内容如下。此时结果中的数据前面会出现标识 +I 和 +U,没有 -U

全部评论(0)