Flink SQL - 窗口聚合使用详解(滚动窗口、滑动窗口、Watermark)
作者:hangge | 2025-04-23 08:30
窗口聚合语句可以使用 Group Window Aggregation 或者 Windowing TVF。Group Window Aggregation 是 Flink1.13 版本之前提供的解决方案,这个方案在 Flink1.13 版本被标记为过时了,不建议使用了。从 Flink1.13 版本开始,官方建议使用 Windowing TVF。下面我将通过样例演示窗口聚合语句的具体使用。
(2)执行代码,控制台输出结果如下。注意等待查看第二次窗口触发时的结果,可以看到 order_cnt 字段的值累加等于 10,因为每秒产生 1 条数据,所以一个窗口内会产生 10 条数据。
(2)执行代码,控制台输出结果如下。注意等待查看第二次窗口触发时的结果,可以看到 order_cnt 字段的值累加等于 10,因为每秒产生 1 条数据,所以一个窗口内会产生 10 条数据。

(2)执行代码,控制台输出结果如下。注意等待查看第二次窗口触发时的结果,可以看到 order_cnt 字段的值累加等于 10,因为每秒产生 1 条数据,所以一个窗口内会产生 10 条数据。

(2)执行代码,控制台输出结果如下,可以发现 Windowing TVF 和 GroupWindowAggregation 方式产生的结果是一样的。注意等待查看第二次窗口触发时的结果,可以看到 order_cnt 字段的值累加等于 10,因为每秒产生 1 条数据,所以一个窗口内会产生 10 条数据。

(2)下面是使用 Java 语言实现同样的功能:
注意:目前 Windowing TVF 只支持流处理任务,针对批处理任务还是需要使用 Group Window Aggregation。
一、滚动窗口
1,使用 GroupWindowAggregation 实现
(1)Group Window Aggregation 这种方式其实就是把滚动窗口定义在了 GROUP BY 子句中,相当于按照窗口进行分组,这样就可以在 SELECT 语句后面实现聚合操作了。
- 下面是 Scala 语言实现代码:
import java.time.ZoneId
import org.apache.flink.configuration.CoreOptions
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
object TumbleWindowByGroupWindowAggSQL {
def main(args: Array[String]): Unit = {
//创建执行环境
val settings = EnvironmentSettings
.newInstance()
//指定执行模式,支持inBatchMode和inStreamingMode
.inStreamingMode()
.build()
val tEnv = TableEnvironment.create(settings)
//指定国内的时区
tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
//设置全局并行度为1
tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM.key(),"1")
//创建输入表
val inTableSql =
"""
|CREATE TABLE orders_source(
| order_id BIGINT,
| order_type STRING,
| price DECIMAL(10,2),
| -- 定义一个时间字段,使用数据处理时间
| order_time AS PROCTIME()
|) WITH (
| 'connector' = 'datagen',
| 'rows-per-second' = '1',
| 'fields.order_type.length' = '1'
|)
|""".stripMargin
tEnv.executeSql(inTableSql)
//业务逻辑
val execSql =
"""
|SELECT
| order_type,
| COUNT(*) AS order_cnt,
| SUM(price) AS price_sum,
| -- 窗口开始时间
| TUMBLE_START(order_time,INTERVAL '10' SECOND) AS window_start,
| -- 窗口结束时间
| TUMBLE_END(order_time,INTERVAL '10' SECOND) AS window_end
|FROM orders_source
|GROUP BY
| -- TUMBLE代表是滚动窗口,第一个参数是时间字段,第二个参数是滚动窗口大小(10秒)
| TUMBLE(order_time,INTERVAL '10' SECOND),
| order_type
|""".stripMargin
tEnv.executeSql(execSql).print()
}
}
- 下面是 Java 语言实现代码:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import java.time.ZoneId;
public class TumbleWindowByGroupWindowAggSQLJava {
public static void main(String[] args) {
// 创建执行环境
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
// 指定执行模式,支持inBatchMode和inStreamingMode
.inStreamingMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
// 指定国内的时区
tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
// 设置全局并行度为1
tEnv.getConfig().set(CoreOptions.DEFAULT_PARALLELISM.key(),"1");
// 创建输入表
String inTableSql = ""
+ "CREATE TABLE orders_source (\n"
+ " order_id BIGINT,\n"
+ " order_type STRING,\n"
+ " price DECIMAL(10,2),\n"
+ " -- 定义一个时间字段,使用数据处理时间\n"
+ " order_time AS PROCTIME()\n"
+ ") WITH (\n"
+ " 'connector' = 'datagen',\n"
+ " 'rows-per-second' = '1',\n"
+ " 'fields.order_type.length' = '1'\n"
+ ")";
tEnv.executeSql(inTableSql);
// 业务逻辑
String execSql = ""
+ "SELECT\n"
+ " order_type,\n"
+ " COUNT(*) AS order_cnt,\n"
+ " SUM(price) AS price_sum,\n"
+ " -- 窗口开始时间\n"
+ " TUMBLE_START(order_time, INTERVAL '10' SECOND) AS window_start,\n"
+ " -- 窗口结束时间\n"
+ " TUMBLE_END(order_time, INTERVAL '10' SECOND) AS window_end\n"
+ "FROM orders_source\n"
+ "GROUP BY\n"
+ " -- TUMBLE代表是滚动窗口,第一个参数是时间字段,第二个参数是滚动窗口大小(10秒)\n"
+ " TUMBLE(order_time, INTERVAL '10' SECOND),\n"
+ " order_type";
tEnv.executeSql(execSql).print();
}
}
(2)执行代码,控制台输出结果如下。注意等待查看第二次窗口触发时的结果,可以看到 order_cnt 字段的值累加等于 10,因为每秒产生 1 条数据,所以一个窗口内会产生 10 条数据。

2,使用 WindowingTVF 实现
(1)Windowing TVF 这种方式其实就是把滚动窗口定义在了数据源的 TABLE 子句中。此时只需要在 GROUP BY 子句中直接对 window_start、window_end、以及业务字段进行分组即可,这样就可以对指定滚动时间窗口内的数据进行分组聚合了。
- 下面是 Scala 语言代码:
注意:这里面的 execSql1 中的 SQL 语句是为了验证 WindowTVF 中都包含哪些字段。


import java.time.ZoneId
import org.apache.flink.configuration.CoreOptions
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
object TumbleWindowByWindowingTVFSQL {
def main(args: Array[String]): Unit = {
//创建执行环境
val settings = EnvironmentSettings
.newInstance()
//指定执行模式,支持inBatchMode和inStreamingMode
.inStreamingMode()
.build()
val tEnv = TableEnvironment.create(settings)
//指定国内的时区
tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
//设置全局并行度为1
tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM.key(),"1")
//创建输入表
val inTableSql =
"""
|CREATE TABLE orders_source (
| order_id BIGINT,
| order_type STRING,
| price DECIMAL(10,2),
| -- 定义一个时间字段,使用数据处理时间
| order_time AS PROCTIME()
|) WITH (
| 'connector' = 'datagen',
| 'rows-per-second' = '1',
| 'fields.order_type.length' = '1'
|)
|""".stripMargin
tEnv.executeSql(inTableSql)
//验证WindowingTVF中都包含哪些字段
val execSql1 =
"""
|SELECT
| *
|FROM TABLE(-- 把TUMBLE WINDOW的信息定义在数据源的TABLE子句中
| TUMBLE(
| TABLE orders_source, -- 指定输入表名称
| DESCRIPTOR(order_time), -- 指定时间字段
| INTERVAL '10' SECOND -- 指定滚动窗口的大小
| )
|)
|""".stripMargin
//tEnv.executeSql(execSql1).print()
//业务逻辑
val execSql2 =
"""
|SELECT
| order_type,
| COUNT(*) AS order_cnt,
| SUM(price) AS price_sum,
| -- 窗口开始时间
| window_start,
| -- 窗口结束时间
| window_end
|FROM TABLE(-- 把TUMBLE WINDOW的信息定义在数据源的TABLE子句中
| TUMBLE(
| TABLE orders_source, -- 指定输入表名称
| DESCRIPTOR(order_time), -- 指定时间字段
| INTERVAL '10' SECOND -- 指定滚动窗口的大小
| )
|)
|GROUP BY
| window_start,
| window_end,
| order_type
|""".stripMargin
tEnv.executeSql(execSql2).print()
}
}- 下面是 Java 语言代码:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import java.time.ZoneId;
public class TumbleWindowByWindowingTVFSQLJava {
public static void main(String[] args) {
// 创建执行环境
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
// 指定执行模式,支持 inBatchMode 和 inStreamingMode
.inStreamingMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
// 指定国内的时区
tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
// 设置全局并行度为1
tEnv.getConfig().set(CoreOptions.DEFAULT_PARALLELISM.key(),"1");
// 创建输入表
String inTableSql = "" +
"CREATE TABLE orders_source (\n" +
" order_id BIGINT,\n" +
" order_type STRING,\n" +
" price DECIMAL(10,2),\n" +
" -- 定义一个时间字段,使用数据处理时间\n" +
" order_time AS PROCTIME()\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second' = '1',\n" +
" 'fields.order_type.length' = '1'\n" +
")";
tEnv.executeSql(inTableSql);
// 验证 WindowingTVF 中都包含哪些字段
String execSql1 = "" +
"SELECT\n" +
" *\n" +
"FROM TABLE(-- 把 TUMBLE WINDOW 的信息定义在数据源的 TABLE 子句中\n" +
" TUMBLE(\n" +
" TABLE orders_source, -- 指定输入表名称\n" +
" DESCRIPTOR(order_time), -- 指定时间字段\n" +
" INTERVAL '10' SECOND -- 指定滚动窗口的大小\n" +
" )\n" +
")";
// tEnv.executeSql(execSql1).print();
// 业务逻辑
String execSql2 = "" +
"SELECT\n" +
" order_type,\n" +
" COUNT(*) AS order_cnt,\n" +
" SUM(price) AS price_sum,\n" +
" -- 窗口开始时间\n" +
" window_start,\n" +
" -- 窗口结束时间\n" +
" window_end\n" +
"FROM TABLE(-- 把 TUMBLE WINDOW 的信息定义在数据源的 TABLE 子句中\n" +
" TUMBLE(\n" +
" TABLE orders_source, -- 指定输入表名称\n" +
" DESCRIPTOR(order_time), -- 指定时间字段\n" +
" INTERVAL '10' SECOND -- 指定滚动窗口的大小\n" +
" )\n" +
")\n" +
"GROUP BY\n" +
" window_start,\n" +
" window_end,\n" +
" order_type";
tEnv.executeSql(execSql2).print();
}
}
提示:可以发现 Windowing TVF 和 GroupWindowAggregation 方式产生的结果是一样的。相对比而言,Windowing TVF 这种方式使用起来更加简洁,特别是在 SELECT 中获取 window_start 和 window_end 字段的时候。
并且 Windowing TVF 中还可以支持一些 GroupWindowAggregation 不支持的高级用法,例如 Grouping Sets,大家感兴趣的话可以查阅资料了解一下。
所以目前官方推荐使用 Windowing TVF,大家重点掌握这种方式的使用即可。

二、滑动窗口
1,使用 GroupWindowAggregation 实现
(1)Group Window Aggregation 这种方式其实就是把滑动窗口定义在了 GROUP BY 子句中,该样例窗口的大小为 10 秒,滑动步长为 5 秒。
- 下面是 Scala 语言实现代码:
import java.time.ZoneId
import org.apache.flink.configuration.CoreOptions
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
object HopWindowByGroupWindowAggSQLJava {
def main(args: Array[String]): Unit = {
//创建执行环境
val settings = EnvironmentSettings
.newInstance()
//指定执行模式,支持inBatchMode和inStreamingMode
.inStreamingMode()
.build()
val tEnv = TableEnvironment.create(settings)
//指定国内的时区
tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
//设置全局并行度为1
tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM.key(),"1")
//创建输入表
val inTableSql =
"""
|CREATE TABLE orders_source(
| order_id BIGINT,
| order_type STRING,
| price DECIMAL(10,2),
| -- 定义一个时间字段,使用数据处理时间
| order_time AS PROCTIME()
|) WITH (
| 'connector' = 'datagen',
| 'rows-per-second' = '1',
| 'fields.order_type.length' = '1'
|)
|""".stripMargin
tEnv.executeSql(inTableSql)
//业务逻辑
val execSql =
"""
|SELECT
| order_type,
| COUNT(*) AS order_cnt,
| SUM(price) AS price_sum,
| -- 窗口开始时间
| HOP_START(order_time,INTERVAL '5' SECOND,INTERVAL '10' SECOND) AS window_start,
| -- 窗口结束时间
| HOP_END(order_time,INTERVAL '5' SECOND,INTERVAL '10' SECOND) AS window_end
|FROM orders_source
|GROUP BY
| -- HOP代表是滑动窗口
| -- 第一个参数是时间字段
| -- 第二个参数是滑动窗口的滑动步长(5秒)
| -- 第三个参数是滑动窗口大小(10秒)
| HOP(order_time,INTERVAL '5' SECOND,INTERVAL '10' SECOND),
| order_type
|""".stripMargin
tEnv.executeSql(execSql).print()
}
}
- 下面是 Java 语言代码:
import java.time.ZoneId;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
public class HopWindowByGroupWindowAggSQLJava {
public static void main(String[] args) {
// 创建执行环境
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
// 指定执行模式,支持inBatchMode和inStreamingMode
.inStreamingMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
// 指定国内的时区
tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
// 设置全局并行度为1
tEnv.getConfig().set(CoreOptions.DEFAULT_PARALLELISM.key(),"1");
// 创建输入表
String inTableSql =
"CREATE TABLE orders_source( \n" +
" order_id BIGINT, \n" +
" order_type STRING, \n" +
" price DECIMAL(10,2), \n" +
" -- 定义一个时间字段,使用数据处理时间 \n" +
" order_time AS PROCTIME() \n" +
") WITH ( \n" +
" 'connector' = 'datagen', \n" +
" 'rows-per-second' = '1', \n" +
" 'fields.order_type.length' = '1' \n" +
")";
tEnv.executeSql(inTableSql);
// 业务逻辑
String execSql =
"SELECT " +
" order_type, \n" +
" COUNT(*) AS order_cnt, \n" +
" SUM(price) AS price_sum, \n" +
" -- 窗口开始时间 \n" +
" HOP_START(order_time, INTERVAL '5' SECOND, INTERVAL '10' SECOND) " +
"AS window_start, \n" +
" -- 窗口结束时间 \n" +
" HOP_END(order_time, INTERVAL '5' SECOND, INTERVAL '10' SECOND) " +
"AS window_end \n" +
"FROM orders_source \n" +
"GROUP BY \n" +
" -- HOP代表是滑动窗口 \n" +
" -- 第一个参数是时间字段 \n" +
" -- 第二个参数是滑动窗口的滑动步长(5秒) \n" +
" -- 第三个参数是滑动窗口大小(10秒) \n" +
" HOP(order_time, INTERVAL '5' SECOND, INTERVAL '10' SECOND), \n" +
" order_type";
tEnv.executeSql(execSql).print();
}
}

2,使用 WindowingTVF 实现
(1)Windowing TVF 这种方式其实就是把滑动窗口定义在了数据源的 TABLE 子句中。此时只需要在 GROUP BY 子句中直接对 window_start、window_end、以及业务字段进行分组即可,这样就可以对指定滑动时间窗口内的数据进行分组聚合了。
- 下面是 Scala 语言代码:
import java.time.ZoneId
import org.apache.flink.configuration.CoreOptions
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
object HopWindowByWindowingTVFSQL {
def main(args: Array[String]): Unit = {
//创建执行环境
val settings = EnvironmentSettings
.newInstance()
//指定执行模式,支持inBatchMode和inStreamingMode
.inStreamingMode()
.build()
val tEnv = TableEnvironment.create(settings)
//指定国内的时区
tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
//设置全局并行度为1
tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM.key(),"1")
//创建输入表
val inTableSql =
"""
|CREATE TABLE orders_source (
| order_id BIGINT,
| order_type STRING,
| price DECIMAL(10,2),
| -- 定义一个时间字段,使用数据处理时间
| order_time AS PROCTIME()
|) WITH (
| 'connector' = 'datagen',
| 'rows-per-second' = '1',
| 'fields.order_type.length' = '1'
|)
|""".stripMargin
tEnv.executeSql(inTableSql)
//业务逻辑
val execSql2 =
"""
|SELECT
| order_type,
| COUNT(*) AS order_cnt,
| SUM(price) AS price_sum,
| -- 窗口开始时间
| window_start,
| -- 窗口结束时间
| window_end
|FROM TABLE(-- 把HOP WINDOW的信息定义在数据源的TABLE子句中
| HOP(
| TABLE orders_source, -- 指定输入表名称
| DESCRIPTOR(order_time), -- 指定时间字段
| INTERVAL '5' SECOND, -- 指定滑动窗口步长
| INTERVAL '10' SECOND -- 指定滑动窗口的大小
| )
|)
|GROUP BY
| window_start,
| window_end,
| order_type
|""".stripMargin
tEnv.executeSql(execSql2).print()
}
}
- 下面是 Java 语言代码:
import java.time.ZoneId;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
public class HopWindowByWindowingTVFSQLJava {
public static void main(String[] args) {
// 创建执行环境
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
// 指定执行模式,支持inBatchMode和inStreamingMode
.inStreamingMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
// 指定国内的时区
tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
// 设置全局并行度为1
tEnv.getConfig().set(CoreOptions.DEFAULT_PARALLELISM.key(),"1");
// 创建输入表
String inTableSql = ""
+ "CREATE TABLE orders_source (\n"
+ " order_id BIGINT,\n"
+ " order_type STRING,\n"
+ " price DECIMAL(10,2),\n"
+ " -- 定义一个时间字段,使用数据处理时间\n"
+ " order_time AS PROCTIME()\n"
+ ") WITH (\n"
+ " 'connector' = 'datagen',\n"
+ " 'rows-per-second' = '1',\n"
+ " 'fields.order_type.length' = '1'\n"
+ ")";
tEnv.executeSql(inTableSql);
// 业务逻辑
String execSql2 =
"SELECT " +
" order_type, \n" +
" COUNT(*) AS order_cnt, \n" +
" SUM(price) AS price_sum, \n" +
" -- 窗口开始时间 \n" +
" window_start, \n" +
" -- 窗口结束时间 \n" +
" window_end \n" +
"FROM TABLE( \n" +
" -- 把HOP WINDOW的信息定义在数据源的TABLE子句中 \n" +
" HOP( \n" +
" TABLE orders_source, -- 指定输入表名称 \n" +
" DESCRIPTOR(order_time), -- 指定时间字段 \n" +
" INTERVAL '5' SECOND, -- 指定滑动窗口步长 \n" +
" INTERVAL '10' SECOND -- 指定滑动窗口的大小 \n" +
" ) \n" +
") \n" +
"GROUP BY \n" +
" window_start, \n" +
" window_end, \n" +
" order_type";
tEnv.executeSql(execSql2).print();
}
}

附:滚动窗口 + Watermark
1,样例代码
(1)这里以滚动窗口为例,使用事件时间 + Watermark 解决实时数据乱序问题。下面是 Scala 语言代码:
注意:正常情况下事件时间 order_time 的值需要从原始数据中解析获取,在这里为了演示方便直接使用 CURRENT_TIMESTAMP 时间函数自动生成了。
import java.time.ZoneId
import org.apache.flink.configuration.CoreOptions
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
object TumbleWindowByWindowingTVFUseWatermarkSQL {
def main(args: Array[String]): Unit = {
//创建执行环境
val settings = EnvironmentSettings
.newInstance()
//指定执行模式,支持inBatchMode和inStreamingMode
.inStreamingMode()
.build()
val tEnv = TableEnvironment.create(settings)
//指定国内的时区
tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
//设置全局并行度为1
tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM.key(),"1")
//创建输入表
val inTableSql =
"""
|CREATE TABLE orders_source (
| order_id BIGINT,
| order_type STRING,
| price DECIMAL(10,2),
| -- 定义一个时间字段,作为事件时间
| order_time AS CAST(CURRENT_TIMESTAMP AS TIMESTAMP_LTZ(3)),
| -- 设置Watermark
| WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
|) WITH (
| 'connector' = 'datagen',
| 'rows-per-second' = '1',
| 'fields.order_type.length' = '1'
|)
|""".stripMargin
tEnv.executeSql(inTableSql)
//业务逻辑
val execSql2 =
"""
|SELECT
| order_type,
| COUNT(*) AS order_cnt,
| SUM(price) AS price_sum,
| -- 窗口开始时间
| window_start,
| -- 窗口结束时间
| window_end
|FROM TABLE(-- 把TUMBLE WINDOW的信息定义在数据源的TABLE子句中
| TUMBLE(
| TABLE orders_source, -- 指定输入表名称
| DESCRIPTOR(order_time), -- 指定时间字段
| INTERVAL '10' SECOND -- 指定滚动窗口的大小
| )
|)
|GROUP BY
| window_start,
| window_end,
| order_type
|""".stripMargin
tEnv.executeSql(execSql2).print()
}
}
(2)下面是使用 Java 语言实现同样的功能:
import java.time.ZoneId;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
public class TumbleWindowByWindowingTVFUseWatermarkSQLJava {
public static void main(String[] args) {
// 创建执行环境
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
// 指定执行模式,支持inBatchMode和inStreamingMode
.inStreamingMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
// 指定国内的时区
tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
// 设置全局并行度为1
tEnv.getConfig().set(CoreOptions.DEFAULT_PARALLELISM.key(),"1");
// 创建输入表
String inTableSql =
"CREATE TABLE orders_source ( \n" +
" order_id BIGINT, \n" +
" order_type STRING, \n" +
" price DECIMAL(10,2), \n" +
" -- 定义一个时间字段,作为事件时间 \n" +
" order_time AS CAST(CURRENT_TIMESTAMP AS TIMESTAMP_LTZ(3)), \n" +
" -- 设置Watermark \n" +
" WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND \n" +
") WITH ( \n" +
" 'connector' = 'datagen', \n" +
" 'rows-per-second' = '1', \n" +
" 'fields.order_type.length' = '1' \n" +
")";
tEnv.executeSql(inTableSql);
// 业务逻辑
String execSql2 =
"SELECT \n" +
" order_type, \n" +
" COUNT(*) AS order_cnt, \n" +
" SUM(price) AS price_sum, \n" +
" -- 窗口开始时间 \n" +
" window_start, \n" +
" -- 窗口结束时间 \n" +
" window_end \n" +
"FROM TABLE( \n" +
" -- 把TUMBLE WINDOW的信息定义在数据源的TABLE子句中 \n" +
" TUMBLE( \n" +
" TABLE orders_source, -- 指定输入表名称 \n" +
" DESCRIPTOR(order_time), -- 指定时间字段 \n" +
" INTERVAL '10' SECOND -- 指定滚动窗口的大小 \n" +
" ) \n" +
") \n" +
"GROUP BY \n" +
" window_start, \n" +
" window_end, \n" +
" order_type";
tEnv.executeSql(execSql2).print();
}
}
2,运行测试
执行代码,同时打开系统的时间组件。可以看到窗口会延迟 5 秒钟触发,因为目前 Watermark 设置允许数据最大乱序时间是 5 秒。

全部评论(0)