Flink SQL - 窗口聚合使用详解(滚动窗口、滑动窗口、Watermark)
作者:hangge | 2025-04-23 08:30
窗口聚合语句可以使用 Group Window Aggregation 或者 Windowing TVF。Group Window Aggregation 是 Flink1.13 版本之前提供的解决方案,这个方案在 Flink1.13 版本被标记为过时了,不建议使用了。从 Flink1.13 版本开始,官方建议使用 Windowing TVF。下面我将通过样例演示窗口聚合语句的具体使用。
(2)执行代码,控制台输出结果如下。注意等待查看第二次窗口触发时的结果,可以看到 order_cnt 字段的值累加等于 10,因为每秒产生 1 条数据,所以一个窗口内会产生 10 条数据。
(2)执行代码,控制台输出结果如下。注意等待查看第二次窗口触发时的结果,可以看到 order_cnt 字段的值累加等于 10,因为每秒产生 1 条数据,所以一个窗口内会产生 10 条数据。

(2)执行代码,控制台输出结果如下。注意等待查看第二次窗口触发时的结果,可以看到 order_cnt 字段的值累加等于 10,因为每秒产生 1 条数据,所以一个窗口内会产生 10 条数据。

(2)执行代码,控制台输出结果如下,可以发现 Windowing TVF 和 GroupWindowAggregation 方式产生的结果是一样的。注意等待查看第二次窗口触发时的结果,可以看到 order_cnt 字段的值累加等于 10,因为每秒产生 1 条数据,所以一个窗口内会产生 10 条数据。

(2)下面是使用 Java 语言实现同样的功能:
注意:目前 Windowing TVF 只支持流处理任务,针对批处理任务还是需要使用 Group Window Aggregation。
一、滚动窗口
1,使用 GroupWindowAggregation 实现
(1)Group Window Aggregation 这种方式其实就是把滚动窗口定义在了 GROUP BY 子句中,相当于按照窗口进行分组,这样就可以在 SELECT 语句后面实现聚合操作了。
- 下面是 Scala 语言实现代码:
import java.time.ZoneId import org.apache.flink.configuration.CoreOptions import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment} object TumbleWindowByGroupWindowAggSQL { def main(args: Array[String]): Unit = { //创建执行环境 val settings = EnvironmentSettings .newInstance() //指定执行模式,支持inBatchMode和inStreamingMode .inStreamingMode() .build() val tEnv = TableEnvironment.create(settings) //指定国内的时区 tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai")) //设置全局并行度为1 tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM.key(),"1") //创建输入表 val inTableSql = """ |CREATE TABLE orders_source( | order_id BIGINT, | order_type STRING, | price DECIMAL(10,2), | -- 定义一个时间字段,使用数据处理时间 | order_time AS PROCTIME() |) WITH ( | 'connector' = 'datagen', | 'rows-per-second' = '1', | 'fields.order_type.length' = '1' |) |""".stripMargin tEnv.executeSql(inTableSql) //业务逻辑 val execSql = """ |SELECT | order_type, | COUNT(*) AS order_cnt, | SUM(price) AS price_sum, | -- 窗口开始时间 | TUMBLE_START(order_time,INTERVAL '10' SECOND) AS window_start, | -- 窗口结束时间 | TUMBLE_END(order_time,INTERVAL '10' SECOND) AS window_end |FROM orders_source |GROUP BY | -- TUMBLE代表是滚动窗口,第一个参数是时间字段,第二个参数是滚动窗口大小(10秒) | TUMBLE(order_time,INTERVAL '10' SECOND), | order_type |""".stripMargin tEnv.executeSql(execSql).print() } }
- 下面是 Java 语言实现代码:
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import java.time.ZoneId; public class TumbleWindowByGroupWindowAggSQLJava { public static void main(String[] args) { // 创建执行环境 EnvironmentSettings settings = EnvironmentSettings .newInstance() // 指定执行模式,支持inBatchMode和inStreamingMode .inStreamingMode() .build(); TableEnvironment tEnv = TableEnvironment.create(settings); // 指定国内的时区 tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")); // 设置全局并行度为1 tEnv.getConfig().set(CoreOptions.DEFAULT_PARALLELISM.key(),"1"); // 创建输入表 String inTableSql = "" + "CREATE TABLE orders_source (\n" + " order_id BIGINT,\n" + " order_type STRING,\n" + " price DECIMAL(10,2),\n" + " -- 定义一个时间字段,使用数据处理时间\n" + " order_time AS PROCTIME()\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' = '1',\n" + " 'fields.order_type.length' = '1'\n" + ")"; tEnv.executeSql(inTableSql); // 业务逻辑 String execSql = "" + "SELECT\n" + " order_type,\n" + " COUNT(*) AS order_cnt,\n" + " SUM(price) AS price_sum,\n" + " -- 窗口开始时间\n" + " TUMBLE_START(order_time, INTERVAL '10' SECOND) AS window_start,\n" + " -- 窗口结束时间\n" + " TUMBLE_END(order_time, INTERVAL '10' SECOND) AS window_end\n" + "FROM orders_source\n" + "GROUP BY\n" + " -- TUMBLE代表是滚动窗口,第一个参数是时间字段,第二个参数是滚动窗口大小(10秒)\n" + " TUMBLE(order_time, INTERVAL '10' SECOND),\n" + " order_type"; tEnv.executeSql(execSql).print(); } }
(2)执行代码,控制台输出结果如下。注意等待查看第二次窗口触发时的结果,可以看到 order_cnt 字段的值累加等于 10,因为每秒产生 1 条数据,所以一个窗口内会产生 10 条数据。

2,使用 WindowingTVF 实现
(1)Windowing TVF 这种方式其实就是把滚动窗口定义在了数据源的 TABLE 子句中。此时只需要在 GROUP BY 子句中直接对 window_start、window_end、以及业务字段进行分组即可,这样就可以对指定滚动时间窗口内的数据进行分组聚合了。
- 下面是 Scala 语言代码:
注意:这里面的 execSql1 中的 SQL 语句是为了验证 WindowTVF 中都包含哪些字段。


import java.time.ZoneId import org.apache.flink.configuration.CoreOptions import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment} object TumbleWindowByWindowingTVFSQL { def main(args: Array[String]): Unit = { //创建执行环境 val settings = EnvironmentSettings .newInstance() //指定执行模式,支持inBatchMode和inStreamingMode .inStreamingMode() .build() val tEnv = TableEnvironment.create(settings) //指定国内的时区 tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai")) //设置全局并行度为1 tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM.key(),"1") //创建输入表 val inTableSql = """ |CREATE TABLE orders_source ( | order_id BIGINT, | order_type STRING, | price DECIMAL(10,2), | -- 定义一个时间字段,使用数据处理时间 | order_time AS PROCTIME() |) WITH ( | 'connector' = 'datagen', | 'rows-per-second' = '1', | 'fields.order_type.length' = '1' |) |""".stripMargin tEnv.executeSql(inTableSql) //验证WindowingTVF中都包含哪些字段 val execSql1 = """ |SELECT | * |FROM TABLE(-- 把TUMBLE WINDOW的信息定义在数据源的TABLE子句中 | TUMBLE( | TABLE orders_source, -- 指定输入表名称 | DESCRIPTOR(order_time), -- 指定时间字段 | INTERVAL '10' SECOND -- 指定滚动窗口的大小 | ) |) |""".stripMargin //tEnv.executeSql(execSql1).print() //业务逻辑 val execSql2 = """ |SELECT | order_type, | COUNT(*) AS order_cnt, | SUM(price) AS price_sum, | -- 窗口开始时间 | window_start, | -- 窗口结束时间 | window_end |FROM TABLE(-- 把TUMBLE WINDOW的信息定义在数据源的TABLE子句中 | TUMBLE( | TABLE orders_source, -- 指定输入表名称 | DESCRIPTOR(order_time), -- 指定时间字段 | INTERVAL '10' SECOND -- 指定滚动窗口的大小 | ) |) |GROUP BY | window_start, | window_end, | order_type |""".stripMargin tEnv.executeSql(execSql2).print() } }
- 下面是 Java 语言代码:
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import java.time.ZoneId; public class TumbleWindowByWindowingTVFSQLJava { public static void main(String[] args) { // 创建执行环境 EnvironmentSettings settings = EnvironmentSettings .newInstance() // 指定执行模式,支持 inBatchMode 和 inStreamingMode .inStreamingMode() .build(); TableEnvironment tEnv = TableEnvironment.create(settings); // 指定国内的时区 tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")); // 设置全局并行度为1 tEnv.getConfig().set(CoreOptions.DEFAULT_PARALLELISM.key(),"1"); // 创建输入表 String inTableSql = "" + "CREATE TABLE orders_source (\n" + " order_id BIGINT,\n" + " order_type STRING,\n" + " price DECIMAL(10,2),\n" + " -- 定义一个时间字段,使用数据处理时间\n" + " order_time AS PROCTIME()\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' = '1',\n" + " 'fields.order_type.length' = '1'\n" + ")"; tEnv.executeSql(inTableSql); // 验证 WindowingTVF 中都包含哪些字段 String execSql1 = "" + "SELECT\n" + " *\n" + "FROM TABLE(-- 把 TUMBLE WINDOW 的信息定义在数据源的 TABLE 子句中\n" + " TUMBLE(\n" + " TABLE orders_source, -- 指定输入表名称\n" + " DESCRIPTOR(order_time), -- 指定时间字段\n" + " INTERVAL '10' SECOND -- 指定滚动窗口的大小\n" + " )\n" + ")"; // tEnv.executeSql(execSql1).print(); // 业务逻辑 String execSql2 = "" + "SELECT\n" + " order_type,\n" + " COUNT(*) AS order_cnt,\n" + " SUM(price) AS price_sum,\n" + " -- 窗口开始时间\n" + " window_start,\n" + " -- 窗口结束时间\n" + " window_end\n" + "FROM TABLE(-- 把 TUMBLE WINDOW 的信息定义在数据源的 TABLE 子句中\n" + " TUMBLE(\n" + " TABLE orders_source, -- 指定输入表名称\n" + " DESCRIPTOR(order_time), -- 指定时间字段\n" + " INTERVAL '10' SECOND -- 指定滚动窗口的大小\n" + " )\n" + ")\n" + "GROUP BY\n" + " window_start,\n" + " window_end,\n" + " order_type"; tEnv.executeSql(execSql2).print(); } }
提示:可以发现 Windowing TVF 和 GroupWindowAggregation 方式产生的结果是一样的。相对比而言,Windowing TVF 这种方式使用起来更加简洁,特别是在 SELECT 中获取 window_start 和 window_end 字段的时候。
并且 Windowing TVF 中还可以支持一些 GroupWindowAggregation 不支持的高级用法,例如 Grouping Sets,大家感兴趣的话可以查阅资料了解一下。
所以目前官方推荐使用 Windowing TVF,大家重点掌握这种方式的使用即可。

二、滑动窗口
1,使用 GroupWindowAggregation 实现
(1)Group Window Aggregation 这种方式其实就是把滑动窗口定义在了 GROUP BY 子句中,该样例窗口的大小为 10 秒,滑动步长为 5 秒。
- 下面是 Scala 语言实现代码:
import java.time.ZoneId import org.apache.flink.configuration.CoreOptions import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment} object HopWindowByGroupWindowAggSQLJava { def main(args: Array[String]): Unit = { //创建执行环境 val settings = EnvironmentSettings .newInstance() //指定执行模式,支持inBatchMode和inStreamingMode .inStreamingMode() .build() val tEnv = TableEnvironment.create(settings) //指定国内的时区 tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai")) //设置全局并行度为1 tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM.key(),"1") //创建输入表 val inTableSql = """ |CREATE TABLE orders_source( | order_id BIGINT, | order_type STRING, | price DECIMAL(10,2), | -- 定义一个时间字段,使用数据处理时间 | order_time AS PROCTIME() |) WITH ( | 'connector' = 'datagen', | 'rows-per-second' = '1', | 'fields.order_type.length' = '1' |) |""".stripMargin tEnv.executeSql(inTableSql) //业务逻辑 val execSql = """ |SELECT | order_type, | COUNT(*) AS order_cnt, | SUM(price) AS price_sum, | -- 窗口开始时间 | HOP_START(order_time,INTERVAL '5' SECOND,INTERVAL '10' SECOND) AS window_start, | -- 窗口结束时间 | HOP_END(order_time,INTERVAL '5' SECOND,INTERVAL '10' SECOND) AS window_end |FROM orders_source |GROUP BY | -- HOP代表是滑动窗口 | -- 第一个参数是时间字段 | -- 第二个参数是滑动窗口的滑动步长(5秒) | -- 第三个参数是滑动窗口大小(10秒) | HOP(order_time,INTERVAL '5' SECOND,INTERVAL '10' SECOND), | order_type |""".stripMargin tEnv.executeSql(execSql).print() } }
- 下面是 Java 语言代码:
import java.time.ZoneId; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; public class HopWindowByGroupWindowAggSQLJava { public static void main(String[] args) { // 创建执行环境 EnvironmentSettings settings = EnvironmentSettings .newInstance() // 指定执行模式,支持inBatchMode和inStreamingMode .inStreamingMode() .build(); TableEnvironment tEnv = TableEnvironment.create(settings); // 指定国内的时区 tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")); // 设置全局并行度为1 tEnv.getConfig().set(CoreOptions.DEFAULT_PARALLELISM.key(),"1"); // 创建输入表 String inTableSql = "CREATE TABLE orders_source( \n" + " order_id BIGINT, \n" + " order_type STRING, \n" + " price DECIMAL(10,2), \n" + " -- 定义一个时间字段,使用数据处理时间 \n" + " order_time AS PROCTIME() \n" + ") WITH ( \n" + " 'connector' = 'datagen', \n" + " 'rows-per-second' = '1', \n" + " 'fields.order_type.length' = '1' \n" + ")"; tEnv.executeSql(inTableSql); // 业务逻辑 String execSql = "SELECT " + " order_type, \n" + " COUNT(*) AS order_cnt, \n" + " SUM(price) AS price_sum, \n" + " -- 窗口开始时间 \n" + " HOP_START(order_time, INTERVAL '5' SECOND, INTERVAL '10' SECOND) " + "AS window_start, \n" + " -- 窗口结束时间 \n" + " HOP_END(order_time, INTERVAL '5' SECOND, INTERVAL '10' SECOND) " + "AS window_end \n" + "FROM orders_source \n" + "GROUP BY \n" + " -- HOP代表是滑动窗口 \n" + " -- 第一个参数是时间字段 \n" + " -- 第二个参数是滑动窗口的滑动步长(5秒) \n" + " -- 第三个参数是滑动窗口大小(10秒) \n" + " HOP(order_time, INTERVAL '5' SECOND, INTERVAL '10' SECOND), \n" + " order_type"; tEnv.executeSql(execSql).print(); } }

2,使用 WindowingTVF 实现
(1)Windowing TVF 这种方式其实就是把滑动窗口定义在了数据源的 TABLE 子句中。此时只需要在 GROUP BY 子句中直接对 window_start、window_end、以及业务字段进行分组即可,这样就可以对指定滑动时间窗口内的数据进行分组聚合了。
- 下面是 Scala 语言代码:
import java.time.ZoneId import org.apache.flink.configuration.CoreOptions import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment} object HopWindowByWindowingTVFSQL { def main(args: Array[String]): Unit = { //创建执行环境 val settings = EnvironmentSettings .newInstance() //指定执行模式,支持inBatchMode和inStreamingMode .inStreamingMode() .build() val tEnv = TableEnvironment.create(settings) //指定国内的时区 tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai")) //设置全局并行度为1 tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM.key(),"1") //创建输入表 val inTableSql = """ |CREATE TABLE orders_source ( | order_id BIGINT, | order_type STRING, | price DECIMAL(10,2), | -- 定义一个时间字段,使用数据处理时间 | order_time AS PROCTIME() |) WITH ( | 'connector' = 'datagen', | 'rows-per-second' = '1', | 'fields.order_type.length' = '1' |) |""".stripMargin tEnv.executeSql(inTableSql) //业务逻辑 val execSql2 = """ |SELECT | order_type, | COUNT(*) AS order_cnt, | SUM(price) AS price_sum, | -- 窗口开始时间 | window_start, | -- 窗口结束时间 | window_end |FROM TABLE(-- 把HOP WINDOW的信息定义在数据源的TABLE子句中 | HOP( | TABLE orders_source, -- 指定输入表名称 | DESCRIPTOR(order_time), -- 指定时间字段 | INTERVAL '5' SECOND, -- 指定滑动窗口步长 | INTERVAL '10' SECOND -- 指定滑动窗口的大小 | ) |) |GROUP BY | window_start, | window_end, | order_type |""".stripMargin tEnv.executeSql(execSql2).print() } }
- 下面是 Java 语言代码:
import java.time.ZoneId; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; public class HopWindowByWindowingTVFSQLJava { public static void main(String[] args) { // 创建执行环境 EnvironmentSettings settings = EnvironmentSettings .newInstance() // 指定执行模式,支持inBatchMode和inStreamingMode .inStreamingMode() .build(); TableEnvironment tEnv = TableEnvironment.create(settings); // 指定国内的时区 tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")); // 设置全局并行度为1 tEnv.getConfig().set(CoreOptions.DEFAULT_PARALLELISM.key(),"1"); // 创建输入表 String inTableSql = "" + "CREATE TABLE orders_source (\n" + " order_id BIGINT,\n" + " order_type STRING,\n" + " price DECIMAL(10,2),\n" + " -- 定义一个时间字段,使用数据处理时间\n" + " order_time AS PROCTIME()\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' = '1',\n" + " 'fields.order_type.length' = '1'\n" + ")"; tEnv.executeSql(inTableSql); // 业务逻辑 String execSql2 = "SELECT " + " order_type, \n" + " COUNT(*) AS order_cnt, \n" + " SUM(price) AS price_sum, \n" + " -- 窗口开始时间 \n" + " window_start, \n" + " -- 窗口结束时间 \n" + " window_end \n" + "FROM TABLE( \n" + " -- 把HOP WINDOW的信息定义在数据源的TABLE子句中 \n" + " HOP( \n" + " TABLE orders_source, -- 指定输入表名称 \n" + " DESCRIPTOR(order_time), -- 指定时间字段 \n" + " INTERVAL '5' SECOND, -- 指定滑动窗口步长 \n" + " INTERVAL '10' SECOND -- 指定滑动窗口的大小 \n" + " ) \n" + ") \n" + "GROUP BY \n" + " window_start, \n" + " window_end, \n" + " order_type"; tEnv.executeSql(execSql2).print(); } }

附:滚动窗口 + Watermark
1,样例代码
(1)这里以滚动窗口为例,使用事件时间 + Watermark 解决实时数据乱序问题。下面是 Scala 语言代码:
注意:正常情况下事件时间 order_time 的值需要从原始数据中解析获取,在这里为了演示方便直接使用 CURRENT_TIMESTAMP 时间函数自动生成了。
import java.time.ZoneId import org.apache.flink.configuration.CoreOptions import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment} object TumbleWindowByWindowingTVFUseWatermarkSQL { def main(args: Array[String]): Unit = { //创建执行环境 val settings = EnvironmentSettings .newInstance() //指定执行模式,支持inBatchMode和inStreamingMode .inStreamingMode() .build() val tEnv = TableEnvironment.create(settings) //指定国内的时区 tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai")) //设置全局并行度为1 tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM.key(),"1") //创建输入表 val inTableSql = """ |CREATE TABLE orders_source ( | order_id BIGINT, | order_type STRING, | price DECIMAL(10,2), | -- 定义一个时间字段,作为事件时间 | order_time AS CAST(CURRENT_TIMESTAMP AS TIMESTAMP_LTZ(3)), | -- 设置Watermark | WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND |) WITH ( | 'connector' = 'datagen', | 'rows-per-second' = '1', | 'fields.order_type.length' = '1' |) |""".stripMargin tEnv.executeSql(inTableSql) //业务逻辑 val execSql2 = """ |SELECT | order_type, | COUNT(*) AS order_cnt, | SUM(price) AS price_sum, | -- 窗口开始时间 | window_start, | -- 窗口结束时间 | window_end |FROM TABLE(-- 把TUMBLE WINDOW的信息定义在数据源的TABLE子句中 | TUMBLE( | TABLE orders_source, -- 指定输入表名称 | DESCRIPTOR(order_time), -- 指定时间字段 | INTERVAL '10' SECOND -- 指定滚动窗口的大小 | ) |) |GROUP BY | window_start, | window_end, | order_type |""".stripMargin tEnv.executeSql(execSql2).print() } }
(2)下面是使用 Java 语言实现同样的功能:
import java.time.ZoneId; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; public class TumbleWindowByWindowingTVFUseWatermarkSQLJava { public static void main(String[] args) { // 创建执行环境 EnvironmentSettings settings = EnvironmentSettings .newInstance() // 指定执行模式,支持inBatchMode和inStreamingMode .inStreamingMode() .build(); TableEnvironment tEnv = TableEnvironment.create(settings); // 指定国内的时区 tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")); // 设置全局并行度为1 tEnv.getConfig().set(CoreOptions.DEFAULT_PARALLELISM.key(),"1"); // 创建输入表 String inTableSql = "CREATE TABLE orders_source ( \n" + " order_id BIGINT, \n" + " order_type STRING, \n" + " price DECIMAL(10,2), \n" + " -- 定义一个时间字段,作为事件时间 \n" + " order_time AS CAST(CURRENT_TIMESTAMP AS TIMESTAMP_LTZ(3)), \n" + " -- 设置Watermark \n" + " WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND \n" + ") WITH ( \n" + " 'connector' = 'datagen', \n" + " 'rows-per-second' = '1', \n" + " 'fields.order_type.length' = '1' \n" + ")"; tEnv.executeSql(inTableSql); // 业务逻辑 String execSql2 = "SELECT \n" + " order_type, \n" + " COUNT(*) AS order_cnt, \n" + " SUM(price) AS price_sum, \n" + " -- 窗口开始时间 \n" + " window_start, \n" + " -- 窗口结束时间 \n" + " window_end \n" + "FROM TABLE( \n" + " -- 把TUMBLE WINDOW的信息定义在数据源的TABLE子句中 \n" + " TUMBLE( \n" + " TABLE orders_source, -- 指定输入表名称 \n" + " DESCRIPTOR(order_time), -- 指定时间字段 \n" + " INTERVAL '10' SECOND -- 指定滚动窗口的大小 \n" + " ) \n" + ") \n" + "GROUP BY \n" + " window_start, \n" + " window_end, \n" + " order_type"; tEnv.executeSql(execSql2).print(); } }
2,运行测试
执行代码,同时打开系统的时间组件。可以看到窗口会延迟 5 秒钟触发,因为目前 Watermark 设置允许数据最大乱序时间是 5 秒。

全部评论(0)