返回 导航

大数据

hangge.com

Flink SQL - 常规列、元数据列、计算列详解(附:定义事件时间)

作者:hangge | 2025-04-22 09:23
    列其实就是字段。 Flink SQL 中的列可以大致划分为常规列(物理列)、元数据列,和计算列。本文将通过样例详细介绍下这几种列。

一、常规列(物理列)

1,基本介绍

常规列定义了物理数据中字段的名称、类型和顺序。

2,使用样例

CREATE TABLE T1(
    id INT,
    name STRING
) WITH (
    ...
)

二、元数据列

1,基本介绍

(1)元数据列是 FlinkSQL 特有的,它允许访问数据源本身的元数据,并不是所有的数据源都有元数据,常见的 kafka 数据源中是有元数据的,所以在基于 Kafka 定义表的时候,可以定义元数据列。

(2)元数据列通过 METADATA 关键字进行定义。

(3)在表结构中,元数据列是可选的,不是必须要有的,如果我们想从元数据中获取一些信息,才需要在表结构中定义元数据列。

(4)元数据列可以用于后续数据的处理,或者写入到目标表中,可以做为表中的正常列去使用,只不过这个列的值的来源和常规列不一样。

2,Kafka 支持的元数据列

(1)下表列出了所有 Kafka 支持的元数据列:
  • topic:代表是数据所在的 topic 的名称
  • partition:代表的是数据所在的 topic 的分区编号
  • headers:代表数据的消息头。
  • leader-epoch:代表的是 kafka 中数据的版本号的概念。
  • offset:代表数据的偏移量。
  • timestamp:代表数据写入 kafka 的时间,这个时间是 kafka 自动生成的。
  • timestamp-type:默认值是 CreateTime,代表的是 timestamp 的类型,表示 timestamp 是在数据写入的时候生成的。

(2)注意:针对这个图中的 R/W 这个列我们来详细解释一下
  • 默认情况下,Flink SQL 引擎认为元数据列是可以读取也可以写入的。但是有些外部存储系统的元数据列是只能用于读取,不能写入的。
  • 这个图中的 R/W 列中,R 表示读,W 表示写。
  • 如果只有 R 表示只读。针对只读的字段,在输入表中可以使用,在输出表中是不能使用的。
  • 针对这些只读的字段,需要在后面指定一个关键字 VIRTUALVIRTUAL 关键字用来标识某个元数据列不写入到外部存储中(不持久化)。

3,Kafka 元数据列 timestamp 使用说明

(1)我们可以使用元数据列从 Kafka 数据中读取它自带的时间戳,然后在 Flink SQL 中使用这个时间戳,进行一些基于时间的窗口操作。
注意:这里所说的 kafka 中的时间戳不是数据中的某个时间戳字段,而是数据在写入 Kafka 里面的时候,Kafka 框架给这条数据打上的时间戳标记。

(2)针对 Kafka 中数据自带的时间戳,在 FlinkSQL 中可以这样使用:
  • k_timestamp:当前字段名称,我们自己定义的
  • TIMESTAMP_LTZ(3):代表当前这个字段的类型
  • METADATA:代表这个字段是一个元数据列
  • FROM 'timestamp':表示 k_timestamp 字段的值会从 kafka 这个 connectortimestamp 这个元数据字段中获取。
CREATE TABLE T1(
    id INT,
    name STRING,
    k_timestamp TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' 
)WITH (
    'connector' = 'kafka'
    ...
)

(3)注意:如果自定义的字段名称和 Connector 中定义的元数据字段的名称一样,那么 METADATA 后面的 FROM xxx 子句是可以被省略的。例如上面语句可以简写为如下语句:
CREATE TABLE T1(
    id INT,
    name STRING,
    `timestamp` TIMESTAMP_LTZ(3) METADATA 
)WITH (
    'connector' = 'kafka'
    ...
)

4,VIRTUAL 关键字使用样例

(1)前面说过针对元数据列中那些只读的字段,需要在后面指定一个关键字 VIRTUALVIRTUAL 关键字用来标识某个元数据列不写入到外部存储中(不持久化)。

(2)下面是具体例子:
  • offset BIGINT METADATA [...] VIRTUAL 中其实省略了 FROM xxx 子句,因为这个字段的名称和 Kafka 中元数据列的名称一致。
  • 由于 offset 属于 flink sql 中的保留关键字,并且也和元数据列的名称一样,所以在使用的时候需要给这个字段名称加上一对反引号,否则语法解析会报错,并且在 SELECT 语句中使用这个字段的时候,也需要添加反引号。
CREATE TABLE T1(
    id INT,
    name STRING,
    `offset` BIGINT METADATA [...] VIRTUAL 
)WITH (
    'connector' = 'kafka'
    ...
)

(3)注意针对这个 T1 表而言:
  • 如果它作为输入表,那么这个表中是包含 offset 字段的。
  • 如果它作为输出表,那么这个表中是不包含 offset 字段的,相当于没有定义 offset 字段。
  • 所以在把 T1 作为输出表的时候,我们在组装 INSERT INTO 语句时,就不要在 INSERT INTO 语句中指定 offset 字段了,否则执行是会报错的。

5,完整的使用 Kafka 中元数据列样例

(1)这里我们基于 Kafka 这个 connector 定义一个带有元数据列的输入表,下面是 Scala 语言代码:
注意:在定义表的时候,针对 kafka 中的元数据字段名称,以及 Flink SQL 内置关键字都需要添加反引号,否则 SQL 在编译的时候会报错。
import java.time.ZoneId
import org.apache.flink.configuration.CoreOptions
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

object KafkaMetadataColumnSQL {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val settings = EnvironmentSettings
      .newInstance()
      //指定执行模式,支持inBatchMode和inStreamingMode
      .inStreamingMode()
      .build()
    val tEnv = TableEnvironment.create(settings)

    //指定国内的时区
    tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))

    //设置全局并行度为1
    tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM.key(),"1")

    //创建输入表
    val inTableSql =
      """
        |CREATE TABLE kafka_source(
        |  name STRING,
        |  age INT,
        |  `topic` STRING METADATA VIRTUAL,
        |  `partition` INT METADATA  VIRTUAL,
        |  `headers` MAP<STRING,STRING> METADATA,
        |  `leader-epoch` INT METADATA VIRTUAL,
        |  `offset` BIGINT METADATA VIRTUAL,
        |  k_timestamp TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
        |  `timestamp-type` STRING METADATA VIRTUAL
        |)WITH(
        |  'connector' = 'kafka',
        |  'topic' = 'dt003',
        |  'properties.bootstrap.servers' = '192.168.121.128:9092',
        |  'properties.group.id' = 'gid-sql-2',
        |  'scan.startup.mode' = 'group-offsets',
        |  'properties.auto.offset.reset' = 'latest',
        |  'format' = 'json',
        |  'json.fail-on-missing-field' = 'false',
        |  'json.ignore-parse-errors' = 'true'
        |)
        |""".stripMargin
    tEnv.executeSql(inTableSql)

    //查看输入表中的数据
    val querySQL =
      """
        |SELECT
        | name,
        | age,
        | `topic`,
        | `partition`,
        | `headers`,
        | `leader-epoch`,
        | `offset`,
        | k_timestamp,
        | `timestamp-type`
        |FROM kafka_source
        |""".stripMargin
    tEnv.executeSql(querySQL).print()
  }
}
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

import java.time.ZoneId;

public class KafkaMetadataColumnSQLJava {
    public static void main(String[] args) {
        // 创建执行环境
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                // 指定执行模式,支持 inBatchMode 和 inStreamingMode
                .inStreamingMode()
                .build();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        // 指定国内的时区
        tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));

        // 设置全局并行度为1
        Configuration configuration = new Configuration();
        configuration.setString(CoreOptions.DEFAULT_PARALLELISM.key(), "1");
        tEnv.getConfig().addConfiguration(configuration);

        // 创建输入表
        String inTableSql = "CREATE TABLE kafka_source( \n" +
                "  name STRING, \n" +
                "  age INT, \n" +
                "  `topic` STRING METADATA VIRTUAL, \n" +
                "  `partition` INT METADATA VIRTUAL, \n" +
                "  `headers` MAP<STRING, STRING> METADATA, \n" +
                "  `leader-epoch` INT METADATA VIRTUAL, \n" +
                "  `offset` BIGINT METADATA VIRTUAL, \n" +
                "  k_timestamp TIMESTAMP_LTZ(3) METADATA FROM 'timestamp', \n" +
                "  `timestamp-type` STRING METADATA VIRTUAL \n" +
                ") WITH ( \n" +
                "  'connector' = 'kafka', \n" +
                "  'topic' = 'dt003', \n" +
                "  'properties.bootstrap.servers' = '192.168.121.128:9092', \n" +
                "  'properties.group.id' = 'gid-sql-2', \n" +
                "  'scan.startup.mode' = 'group-offsets', \n" +
                "  'properties.auto.offset.reset' = 'latest', \n" +
                "  'format' = 'json', \n" +
                "  'json.fail-on-missing-field' = 'false', \n" +
                "  'json.ignore-parse-errors' = 'true' \n" +
                ")";
        tEnv.executeSql(inTableSql);

        // 查看输入表中的数据
        String querySQL = "SELECT \n" +
                "  name, \n" +
                "  age, \n" +
                "  `topic`, \n" +
                "  `partition`, \n" +
                "  `headers`, \n" +
                "  `leader-epoch`, \n" +
                "  `offset`, \n" +
                "  k_timestamp, \n" +
                "  `timestamp-type` \n" +
                "FROM kafka_source";
        tEnv.executeSql(querySQL).print();
    }
}

(2)开始测试,首先我们创建 dt003 这两个 topic
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic dt003

(3)然后我们开启一个基于控制台的生产者:
kafka-console-producer.sh --broker-list localhost:9092 --topic dt003

(4)执行代码。然后手工向 Topic 中生产数据:
{"name":"hangge","age":18}
{"name":"hangge","age":18,"k_timestamp":"1999-01-08 16:42:21.914"}

(5)可以看到控制台输出内容如下。注意:
这里的时间是 Kafka 框架自动生成的,所以使用的是 Kafka 所在服务器的时间,即 2024-12-05 这个日期。
  • 数据源中如果包含的字段名称和我们定义的元数据字段名称重复,数据源中的那个字段会被忽略。

三、计算列

1,基本介绍

计算列是基于指定语法生成的虚拟列,在建表语句中通过表达式来定义。

2,使用样例

(1)下面这个建表语句里面的 cost 这个列就是计算列,它是由一个表达式定义的。表达式里面可以包含列、常量或函数进行组合。
CREATE TABLE T1(
    price INT,
    count INT,
    cost AS  price * count
) WITH (
    ...
)

(2)大家可能在这会有一个疑问,如果我在定义表的时候不使用这个计算列,在查询这个表的时候,在 SELECT 语句中直接使用表达式生成 cost 这个列,这样是不是更加灵活方便呢?
  • 如果只是一个简单的表达式,那么在 SELECT 语句中定义是最简单的。
  • 但是我们一般在用到计算列的时候都是需要对事件时间字段进行一些处理,这样就无法在 SELECT 语句中定义了。具体可以看下面的案例。

附:Flink SQL 中定义事件时间

1,基本介绍

(1)想要在 Flink SQL 中定义事件时间字段,需要通过 Watermark 来定义。
(2)Watermark 需要在 Flink SQL 的建表语句中定义。它的格式如下:
  • rowtime_column_name:表示是一个事件时间字段,这个字段的类型必须是 TIMESTAMP(3)或者 TIMESTAMP_LTZ(3)类型。
  • watermark_strategy_expression:在这里指定 Watermark 的生成策略,一般都是使用 rowtime_column_name 减去固定的时间间隔,这样就相当于指定了最大允许的数据乱序时间。
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

2,使用样例

(1)如果我们想要使用事件时间语义,那么必须要设置事件时间字段和 Watermark 生成策略。下面来看一个 Flink SQL 的建表语句:
  • 这里面的 c_time 其实就是一个普通的时间戳字段,但是下面通过 WATERMARKc_time 进行了定义,所以 c_time 就是一个事件时间字段了。
  • WATERMARK 中的 c_time - INTERVAL '5' SECOND 表示是 WATERMARK 的生成策略。
  • c_time - INTERVAL '5' SECOND 的意思就是 WATERMARK 允许数据最大的乱序时间是 5 秒。
  • 这里面 c_time 字段的类型必须是 TIMESTAMP(3)或者 TIMESTAMP_LTZ(3)类型。
CREATE TABLE T1(
    title STRING,
    c_time TIMESTAMP(3),
    WATERMARK FOR c_time AS c_time - INTERVAL '5' SECOND 
) WITH (
    ...
)

(2)上面样例 SQL 里面 c_time 字段的类型必须是 TIMESTAMP(3)或者 TIMESTAMP_LTZ(3)类型。如果原始数据中的 c_time 字段不是这些数据类型,那么就需要使用计算列进行转化了。具体看下面这个 Flink SQL 的建表语句:
  • 这个表对应的原始数据中自带的时间是一个毫秒级的时间戳,所以在表中定义了一个 ts BIGINT 类型的字段。
  • 但是这个字段无法被声明为事件时间字段,需要进行转换,所以在表中增加了一个 c_time 字段,这个 c_time 字段就是一个计算列了,里面通过 TO_TIMESTAMP_LTZ 函数把 ts 这个毫秒级时间戳转换为了 TIMESTAMP_LTZ(3)类型,这样就可以满足事件时间的要求了。
  • 针对这种需求,在 SELECT 语句中就无法实现了,所以这就是计算列的典型应用场景。
CREATE TABLE T1(
    title STRING,
    ts BIGINT,
    c_time AS TO_TIMESTAMP_LTZ(ts, 3),
    WATERMARK FOR c_time AS c_time - INTERVAL '5' SECOND 
) WITH (
    ...
)
评论

全部评论(0)

回到顶部