返回 导航

大数据

hangge.com

Flink SQL - 窗口聚合使用详解(滚动窗口、滑动窗口、Watermark)

作者:hangge | 2025-04-23 08:30
    窗口聚合语句可以使用 Group Window Aggregation 或者 Windowing TVFGroup Window AggregationFlink1.13 版本之前提供的解决方案,这个方案在 Flink1.13 版本被标记为过时了,不建议使用了。从 Flink1.13 版本开始,官方建议使用 Windowing TVF。下面我将通过样例演示窗口聚合语句的具体使用。
注意:目前 Windowing TVF 只支持流处理任务,针对批处理任务还是需要使用 Group Window Aggregation

一、滚动窗口

1,使用 GroupWindowAggregation 实现

(1)Group Window Aggregation 这种方式其实就是把滚动窗口定义在了 GROUP BY 子句中,相当于按照窗口进行分组,这样就可以在 SELECT 语句后面实现聚合操作了。
  • 下面是 Scala 语言实现代码:
import java.time.ZoneId
import org.apache.flink.configuration.CoreOptions
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

object TumbleWindowByGroupWindowAggSQL {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val settings = EnvironmentSettings
      .newInstance()
      //指定执行模式,支持inBatchMode和inStreamingMode
      .inStreamingMode()
      .build()
    val tEnv = TableEnvironment.create(settings)

    //指定国内的时区
    tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))

    //设置全局并行度为1
    tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM.key(),"1")

    //创建输入表
    val inTableSql =
      """
        |CREATE TABLE orders_source(
        |    order_id    BIGINT,
        |    order_type  STRING,
        |    price       DECIMAL(10,2),
        |    -- 定义一个时间字段,使用数据处理时间
        |    order_time  AS PROCTIME()
        |) WITH (
        |    'connector' = 'datagen',
        |    'rows-per-second' = '1',
        |    'fields.order_type.length' = '1'
        |)
        |""".stripMargin
    tEnv.executeSql(inTableSql)

    //业务逻辑
    val execSql =
      """
        |SELECT
        |  order_type,
        |  COUNT(*) AS order_cnt,
        |  SUM(price) AS price_sum,
        |  -- 窗口开始时间
        |  TUMBLE_START(order_time,INTERVAL '10' SECOND) AS window_start,
        |  -- 窗口结束时间
        |  TUMBLE_END(order_time,INTERVAL '10' SECOND) AS window_end
        |FROM orders_source
        |GROUP BY
        |  -- TUMBLE代表是滚动窗口,第一个参数是时间字段,第二个参数是滚动窗口大小(10秒)
        |  TUMBLE(order_time,INTERVAL '10' SECOND),
        |  order_type
        |""".stripMargin
    tEnv.executeSql(execSql).print()
  }
}
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import java.time.ZoneId;

public class TumbleWindowByGroupWindowAggSQLJava {
    public static void main(String[] args) {
        // 创建执行环境
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                // 指定执行模式,支持inBatchMode和inStreamingMode
                .inStreamingMode()
                .build();

        TableEnvironment tEnv = TableEnvironment.create(settings);

        // 指定国内的时区
        tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));

        // 设置全局并行度为1
        tEnv.getConfig().set(CoreOptions.DEFAULT_PARALLELISM.key(),"1");

        // 创建输入表
        String inTableSql = ""
                + "CREATE TABLE orders_source (\n"
                + "    order_id BIGINT,\n"
                + "    order_type STRING,\n"
                + "    price DECIMAL(10,2),\n"
                + "    -- 定义一个时间字段,使用数据处理时间\n"
                + "    order_time AS PROCTIME()\n"
                + ") WITH (\n"
                + "    'connector' = 'datagen',\n"
                + "    'rows-per-second' = '1',\n"
                + "    'fields.order_type.length' = '1'\n"
                + ")";
        tEnv.executeSql(inTableSql);

        // 业务逻辑
        String execSql = ""
                + "SELECT\n"
                + "  order_type,\n"
                + "  COUNT(*) AS order_cnt,\n"
                + "  SUM(price) AS price_sum,\n"
                + "  -- 窗口开始时间\n"
                + "  TUMBLE_START(order_time, INTERVAL '10' SECOND) AS window_start,\n"
                + "  -- 窗口结束时间\n"
                + "  TUMBLE_END(order_time, INTERVAL '10' SECOND) AS window_end\n"
                + "FROM orders_source\n"
                + "GROUP BY\n"
                + "  -- TUMBLE代表是滚动窗口,第一个参数是时间字段,第二个参数是滚动窗口大小(10秒)\n"
                + "  TUMBLE(order_time, INTERVAL '10' SECOND),\n"
                + "  order_type";
        tEnv.executeSql(execSql).print();
    }
}

(2)执行代码,控制台输出结果如下。注意等待查看第二次窗口触发时的结果,可以看到 order_cnt 字段的值累加等于 10,因为每秒产生 1 条数据,所以一个窗口内会产生 10 条数据。

2,使用 WindowingTVF 实现

(1)Windowing TVF 这种方式其实就是把滚动窗口定义在了数据源的 TABLE 子句中。此时只需要在 GROUP BY 子句中直接对 window_startwindow_end、以及业务字段进行分组即可,这样就可以对指定滚动时间窗口内的数据进行分组聚合了。
  • 下面是 Scala 语言代码:
注意:这里面的 execSql1 中的 SQL 语句是为了验证 WindowTVF 中都包含哪些字段。
import java.time.ZoneId
import org.apache.flink.configuration.CoreOptions
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

object TumbleWindowByWindowingTVFSQL {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val settings = EnvironmentSettings
      .newInstance()
      //指定执行模式,支持inBatchMode和inStreamingMode
      .inStreamingMode()
      .build()
    val tEnv = TableEnvironment.create(settings)

    //指定国内的时区
    tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))

    //设置全局并行度为1
    tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM.key(),"1")

    //创建输入表
    val inTableSql =
      """
        |CREATE TABLE orders_source (
        |    order_id     BIGINT,
        |    order_type   STRING,
        |    price        DECIMAL(10,2),
        |    -- 定义一个时间字段,使用数据处理时间
        |    order_time   AS PROCTIME()
        |) WITH (
        |  'connector' = 'datagen',
        |  'rows-per-second' = '1',
        |  'fields.order_type.length' = '1'
        |)
        |""".stripMargin
    tEnv.executeSql(inTableSql)

    //验证WindowingTVF中都包含哪些字段
    val execSql1 =
      """
        |SELECT
        |  *
        |FROM TABLE(-- 把TUMBLE WINDOW的信息定义在数据源的TABLE子句中
        |  TUMBLE(
        |    TABLE orders_source, -- 指定输入表名称
        |    DESCRIPTOR(order_time), -- 指定时间字段
        |    INTERVAL '10' SECOND -- 指定滚动窗口的大小
        |  )
        |)
        |""".stripMargin
    //tEnv.executeSql(execSql1).print()

    //业务逻辑
    val execSql2 =
      """
        |SELECT
        |  order_type,
        |  COUNT(*) AS order_cnt,
        |  SUM(price) AS price_sum,
        |  -- 窗口开始时间
        |  window_start,
        |  -- 窗口结束时间
        |  window_end
        |FROM TABLE(-- 把TUMBLE WINDOW的信息定义在数据源的TABLE子句中
        |  TUMBLE(
        |    TABLE orders_source, -- 指定输入表名称
        |    DESCRIPTOR(order_time), -- 指定时间字段
        |    INTERVAL '10' SECOND -- 指定滚动窗口的大小
        |  )
        |)
        |GROUP BY
        |  window_start,
        |  window_end,
        |  order_type
        |""".stripMargin

    tEnv.executeSql(execSql2).print()
  }
}
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

import java.time.ZoneId;

public class TumbleWindowByWindowingTVFSQLJava {
    public static void main(String[] args) {
        // 创建执行环境
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                // 指定执行模式,支持 inBatchMode 和 inStreamingMode
                .inStreamingMode()
                .build();

        TableEnvironment tEnv = TableEnvironment.create(settings);

        // 指定国内的时区
        tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));

        // 设置全局并行度为1
        tEnv.getConfig().set(CoreOptions.DEFAULT_PARALLELISM.key(),"1");

        // 创建输入表
        String inTableSql = "" +
                "CREATE TABLE orders_source (\n" +
                "    order_id     BIGINT,\n" +
                "    order_type   STRING,\n" +
                "    price        DECIMAL(10,2),\n" +
                "    -- 定义一个时间字段,使用数据处理时间\n" +
                "    order_time   AS PROCTIME()\n" +
                ") WITH (\n" +
                "  'connector' = 'datagen',\n" +
                "  'rows-per-second' = '1',\n" +
                "  'fields.order_type.length' = '1'\n" +
                ")";
        tEnv.executeSql(inTableSql);

        // 验证 WindowingTVF 中都包含哪些字段
        String execSql1 = "" +
                "SELECT\n" +
                "  *\n" +
                "FROM TABLE(-- 把 TUMBLE WINDOW 的信息定义在数据源的 TABLE 子句中\n" +
                "  TUMBLE(\n" +
                "    TABLE orders_source, -- 指定输入表名称\n" +
                "    DESCRIPTOR(order_time), -- 指定时间字段\n" +
                "    INTERVAL '10' SECOND -- 指定滚动窗口的大小\n" +
                "  )\n" +
                ")";
        // tEnv.executeSql(execSql1).print();

        // 业务逻辑
        String execSql2 = "" +
                "SELECT\n" +
                "  order_type,\n" +
                "  COUNT(*) AS order_cnt,\n" +
                "  SUM(price) AS price_sum,\n" +
                "  -- 窗口开始时间\n" +
                "  window_start,\n" +
                "  -- 窗口结束时间\n" +
                "  window_end\n" +
                "FROM TABLE(-- 把 TUMBLE WINDOW 的信息定义在数据源的 TABLE 子句中\n" +
                "  TUMBLE(\n" +
                "    TABLE orders_source, -- 指定输入表名称\n" +
                "    DESCRIPTOR(order_time), -- 指定时间字段\n" +
                "    INTERVAL '10' SECOND -- 指定滚动窗口的大小\n" +
                "  )\n" +
                ")\n" +
                "GROUP BY\n" +
                "  window_start,\n" +
                "  window_end,\n" +
                "  order_type";
        tEnv.executeSql(execSql2).print();
    }
}

(2)执行代码,控制台输出结果如下。注意等待查看第二次窗口触发时的结果,可以看到 order_cnt 字段的值累加等于 10,因为每秒产生 1 条数据,所以一个窗口内会产生 10 条数据。
提示:可以发现 Windowing TVFGroupWindowAggregation 方式产生的结果是一样的。相对比而言,Windowing TVF 这种方式使用起来更加简洁,特别是在 SELECT 中获取 window_startwindow_end 字段的时候。 并且 Windowing TVF 中还可以支持一些 GroupWindowAggregation 不支持的高级用法,例如 Grouping Sets,大家感兴趣的话可以查阅资料了解一下。 所以目前官方推荐使用 Windowing TVF,大家重点掌握这种方式的使用即可。

二、滑动窗口

1,使用 GroupWindowAggregation 实现

(1)Group Window Aggregation 这种方式其实就是把滑动窗口定义在了 GROUP BY 子句中,该样例窗口的大小为 10 秒,滑动步长为 5 秒。
  • 下面是 Scala 语言实现代码:
import java.time.ZoneId
import org.apache.flink.configuration.CoreOptions
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

object HopWindowByGroupWindowAggSQLJava {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val settings = EnvironmentSettings
      .newInstance()
      //指定执行模式,支持inBatchMode和inStreamingMode
      .inStreamingMode()
      .build()
    val tEnv = TableEnvironment.create(settings)

    //指定国内的时区
    tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))

    //设置全局并行度为1
    tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM.key(),"1")

    //创建输入表
    val inTableSql =
      """
        |CREATE TABLE orders_source(
        |    order_id    BIGINT,
        |    order_type  STRING,
        |    price       DECIMAL(10,2),
        |    -- 定义一个时间字段,使用数据处理时间
        |    order_time  AS PROCTIME()
        |) WITH (
        |    'connector' = 'datagen',
        |    'rows-per-second' = '1',
        |    'fields.order_type.length' = '1'
        |)
        |""".stripMargin
    tEnv.executeSql(inTableSql)

    //业务逻辑
    val execSql =
      """
        |SELECT
        |  order_type,
        |  COUNT(*) AS order_cnt,
        |  SUM(price) AS price_sum,
        |  -- 窗口开始时间
        |  HOP_START(order_time,INTERVAL '5' SECOND,INTERVAL '10' SECOND) AS window_start,
        |  -- 窗口结束时间
        |  HOP_END(order_time,INTERVAL '5' SECOND,INTERVAL '10' SECOND) AS window_end
        |FROM orders_source
        |GROUP BY
        |  -- HOP代表是滑动窗口
        |  -- 第一个参数是时间字段
        |  -- 第二个参数是滑动窗口的滑动步长(5秒)
        |  -- 第三个参数是滑动窗口大小(10秒)
        |  HOP(order_time,INTERVAL '5' SECOND,INTERVAL '10' SECOND),
        |  order_type
        |""".stripMargin
    tEnv.executeSql(execSql).print()
  }
}
import java.time.ZoneId;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class HopWindowByGroupWindowAggSQLJava {
    public static void main(String[] args) {
        // 创建执行环境
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                // 指定执行模式,支持inBatchMode和inStreamingMode
                .inStreamingMode()
                .build();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        // 指定国内的时区
        tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));

        // 设置全局并行度为1
        tEnv.getConfig().set(CoreOptions.DEFAULT_PARALLELISM.key(),"1");

        // 创建输入表
        String inTableSql =
                "CREATE TABLE orders_source( \n" +
                        "    order_id    BIGINT, \n" +
                        "    order_type  STRING, \n" +
                        "    price       DECIMAL(10,2), \n" +
                        "    -- 定义一个时间字段,使用数据处理时间 \n" +
                        "    order_time  AS PROCTIME() \n" +
                        ") WITH ( \n" +
                        "    'connector' = 'datagen', \n" +
                        "    'rows-per-second' = '1', \n" +
                        "    'fields.order_type.length' = '1' \n" +
                        ")";
        tEnv.executeSql(inTableSql);

        // 业务逻辑
        String execSql =
                "SELECT " +
                        "  order_type, \n" +
                        "  COUNT(*) AS order_cnt, \n" +
                        "  SUM(price) AS price_sum, \n" +
                        "  -- 窗口开始时间 \n" +
                        "  HOP_START(order_time, INTERVAL '5' SECOND, INTERVAL '10' SECOND) " +
                        "AS window_start, \n" +
                        "  -- 窗口结束时间 \n" +
                        "  HOP_END(order_time, INTERVAL '5' SECOND, INTERVAL '10' SECOND) " +
                        "AS window_end \n" +
                        "FROM orders_source \n" +
                        "GROUP BY \n" +
                        "  -- HOP代表是滑动窗口 \n" +
                        "  -- 第一个参数是时间字段 \n" +
                        "  -- 第二个参数是滑动窗口的滑动步长(5秒) \n" +
                        "  -- 第三个参数是滑动窗口大小(10秒) \n" +
                        "  HOP(order_time, INTERVAL '5' SECOND, INTERVAL '10' SECOND), \n" +
                        "  order_type";
        tEnv.executeSql(execSql).print();
    }
}

(2)执行代码,控制台输出结果如下。注意等待查看第二次窗口触发时的结果,可以看到 order_cnt 字段的值累加等于 10,因为每秒产生 1 条数据,所以一个窗口内会产生 10 条数据。

2,使用 WindowingTVF 实现

(1)Windowing TVF 这种方式其实就是把滑动窗口定义在了数据源的 TABLE 子句中。此时只需要在 GROUP BY 子句中直接对 window_startwindow_end、以及业务字段进行分组即可,这样就可以对指定滑动时间窗口内的数据进行分组聚合了。
  • 下面是 Scala 语言代码:
import java.time.ZoneId
import org.apache.flink.configuration.CoreOptions
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

object HopWindowByWindowingTVFSQL {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val settings = EnvironmentSettings
      .newInstance()
      //指定执行模式,支持inBatchMode和inStreamingMode
      .inStreamingMode()
      .build()
    val tEnv = TableEnvironment.create(settings)

    //指定国内的时区
    tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))

    //设置全局并行度为1
    tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM.key(),"1")

    //创建输入表
    val inTableSql =
      """
        |CREATE TABLE orders_source (
        |    order_id     BIGINT,
        |    order_type   STRING,
        |    price        DECIMAL(10,2),
        |    -- 定义一个时间字段,使用数据处理时间
        |    order_time   AS PROCTIME()
        |) WITH (
        |  'connector' = 'datagen',
        |  'rows-per-second' = '1',
        |  'fields.order_type.length' = '1'
        |)
        |""".stripMargin
    tEnv.executeSql(inTableSql)

    //业务逻辑
    val execSql2 =
      """
        |SELECT
        |  order_type,
        |  COUNT(*) AS order_cnt,
        |  SUM(price) AS price_sum,
        |  -- 窗口开始时间
        |  window_start,
        |  -- 窗口结束时间
        |  window_end
        |FROM TABLE(-- 把HOP WINDOW的信息定义在数据源的TABLE子句中
        |  HOP(
        |    TABLE orders_source, -- 指定输入表名称
        |    DESCRIPTOR(order_time), -- 指定时间字段
        |    INTERVAL '5' SECOND, -- 指定滑动窗口步长
        |    INTERVAL '10' SECOND -- 指定滑动窗口的大小
        |  )
        |)
        |GROUP BY
        |  window_start,
        |  window_end,
        |  order_type
        |""".stripMargin

    tEnv.executeSql(execSql2).print()
  }
}
import java.time.ZoneId;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class HopWindowByWindowingTVFSQLJava {
    public static void main(String[] args) {
        // 创建执行环境
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                // 指定执行模式,支持inBatchMode和inStreamingMode
                .inStreamingMode()
                .build();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        // 指定国内的时区
        tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));

        // 设置全局并行度为1
        tEnv.getConfig().set(CoreOptions.DEFAULT_PARALLELISM.key(),"1");

        // 创建输入表
        String inTableSql = ""
                + "CREATE TABLE orders_source (\n"
                + "    order_id BIGINT,\n"
                + "    order_type STRING,\n"
                + "    price DECIMAL(10,2),\n"
                + "    -- 定义一个时间字段,使用数据处理时间\n"
                + "    order_time AS PROCTIME()\n"
                + ") WITH (\n"
                + "    'connector' = 'datagen',\n"
                + "    'rows-per-second' = '1',\n"
                + "    'fields.order_type.length' = '1'\n"
                + ")";
        tEnv.executeSql(inTableSql);

        // 业务逻辑
        String execSql2 =
                "SELECT " +
                        "  order_type, \n" +
                        "  COUNT(*) AS order_cnt, \n" +
                        "  SUM(price) AS price_sum, \n" +
                        "  -- 窗口开始时间 \n" +
                        "  window_start, \n" +
                        "  -- 窗口结束时间 \n" +
                        "  window_end \n" +
                        "FROM TABLE( \n" +
                        "  -- 把HOP WINDOW的信息定义在数据源的TABLE子句中 \n" +
                        "  HOP( \n" +
                        "    TABLE orders_source, -- 指定输入表名称 \n" +
                        "    DESCRIPTOR(order_time), -- 指定时间字段 \n" +
                        "    INTERVAL '5' SECOND, -- 指定滑动窗口步长 \n" +
                        "    INTERVAL '10' SECOND -- 指定滑动窗口的大小 \n" +
                        "  ) \n" +
                        ") \n" +
                        "GROUP BY \n" +
                        "  window_start, \n" +
                        "  window_end, \n" +
                        "  order_type";
        tEnv.executeSql(execSql2).print();
    }
}

(2)执行代码,控制台输出结果如下,可以发现 Windowing TVFGroupWindowAggregation 方式产生的结果是一样的。注意等待查看第二次窗口触发时的结果,可以看到 order_cnt 字段的值累加等于 10,因为每秒产生 1 条数据,所以一个窗口内会产生 10 条数据。

附:滚动窗口 + Watermark

1,样例代码

(1)这里以滚动窗口为例,使用事件时间 + Watermark 解决实时数据乱序问题。下面是 Scala 语言代码:
注意:正常情况下事件时间 order_time 的值需要从原始数据中解析获取,在这里为了演示方便直接使用 CURRENT_TIMESTAMP 时间函数自动生成了。
import java.time.ZoneId
import org.apache.flink.configuration.CoreOptions
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

object TumbleWindowByWindowingTVFUseWatermarkSQL {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val settings = EnvironmentSettings
      .newInstance()
      //指定执行模式,支持inBatchMode和inStreamingMode
      .inStreamingMode()
      .build()
    val tEnv = TableEnvironment.create(settings)

    //指定国内的时区
    tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))

    //设置全局并行度为1
    tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM.key(),"1")

    //创建输入表
    val inTableSql =
      """
        |CREATE TABLE orders_source (
        |    order_id     BIGINT,
        |    order_type   STRING,
        |    price        DECIMAL(10,2),
        |    -- 定义一个时间字段,作为事件时间
        |    order_time   AS CAST(CURRENT_TIMESTAMP AS TIMESTAMP_LTZ(3)),
        |    -- 设置Watermark
        |     WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
        |) WITH (
        |  'connector' = 'datagen',
        |  'rows-per-second' = '1',
        |  'fields.order_type.length' = '1'
        |)
        |""".stripMargin
    tEnv.executeSql(inTableSql)

    //业务逻辑
    val execSql2 =
      """
        |SELECT
        |  order_type,
        |  COUNT(*) AS order_cnt,
        |  SUM(price) AS price_sum,
        |  -- 窗口开始时间
        |  window_start,
        |  -- 窗口结束时间
        |  window_end
        |FROM TABLE(-- 把TUMBLE WINDOW的信息定义在数据源的TABLE子句中
        |  TUMBLE(
        |    TABLE orders_source, -- 指定输入表名称
        |    DESCRIPTOR(order_time), -- 指定时间字段
        |    INTERVAL '10' SECOND -- 指定滚动窗口的大小
        |  )
        |)
        |GROUP BY
        |  window_start,
        |  window_end,
        |  order_type
        |""".stripMargin
    tEnv.executeSql(execSql2).print()
  }
}

(2)下面是使用 Java 语言实现同样的功能:
import java.time.ZoneId;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class TumbleWindowByWindowingTVFUseWatermarkSQLJava {
    public static void main(String[] args) {
        // 创建执行环境
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                // 指定执行模式,支持inBatchMode和inStreamingMode
                .inStreamingMode()
                .build();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        // 指定国内的时区
        tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));

        // 设置全局并行度为1
        tEnv.getConfig().set(CoreOptions.DEFAULT_PARALLELISM.key(),"1");

        // 创建输入表
        String inTableSql =
                "CREATE TABLE orders_source ( \n" +
                        "    order_id     BIGINT, \n" +
                        "    order_type   STRING, \n" +
                        "    price        DECIMAL(10,2), \n" +
                        "    -- 定义一个时间字段,作为事件时间 \n" +
                        "    order_time   AS CAST(CURRENT_TIMESTAMP AS TIMESTAMP_LTZ(3)), \n" +
                        "    -- 设置Watermark \n" +
                        "    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND \n" +
                        ") WITH ( \n" +
                        "  'connector' = 'datagen', \n" +
                        "  'rows-per-second' = '1', \n" +
                        "  'fields.order_type.length' = '1' \n" +
                        ")";
        tEnv.executeSql(inTableSql);

        // 业务逻辑
        String execSql2 =
                "SELECT \n" +
                        "  order_type, \n" +
                        "  COUNT(*) AS order_cnt, \n" +
                        "  SUM(price) AS price_sum, \n" +
                        "  -- 窗口开始时间 \n" +
                        "  window_start, \n" +
                        "  -- 窗口结束时间 \n" +
                        "  window_end \n" +
                        "FROM TABLE( \n" +
                        "  -- 把TUMBLE WINDOW的信息定义在数据源的TABLE子句中 \n" +
                        "  TUMBLE( \n" +
                        "    TABLE orders_source, -- 指定输入表名称 \n" +
                        "    DESCRIPTOR(order_time), -- 指定时间字段 \n" +
                        "    INTERVAL '10' SECOND -- 指定滚动窗口的大小 \n" +
                        "  ) \n" +
                        ") \n" +
                        "GROUP BY \n" +
                        "  window_start, \n" +
                        "  window_end, \n" +
                        "  order_type";
        tEnv.executeSql(execSql2).print();
    }
}

2,运行测试

    执行代码,同时打开系统的时间组件。可以看到窗口会延迟 5 秒钟触发,因为目前 Watermark 设置允许数据最大乱序时间是 5 秒。
评论

全部评论(0)

回到顶部