返回 导航

大数据

hangge.com

Flink SQL - 常见数据类型详解(附:各种日期格式、日期函数使用样例)

作者:hangge | 2025-04-21 09:04

一、数据类型详解

1,基本介绍

(1)Flink SQL 中支持的数据类型大致可以分为三类:
  • 基础数据类型
  • 复合数据类型
  • 自定义数据类型
(2)在工作中,基础数据类型和复合数据类型其实就足够使用了。

2,基础数据类型

(1)字符串:包括:CHARVARCHARSTRING,一般直接用 STRING 即可。
(2)二进制:包括:BINARYVARBINARYBYTES
(3)数值类型:包括 DECIMALNUMERICTINYINTSMALLINTINT \ INTEGERBIGINTFLOATDOUBLE
  • 整数一般使用 INT 或者 BIGINTINTBIGINT 的取值范围和 Java 中的 intlong 的取值范围一样。
  • 单精度的浮点数据使用 FLOAT 类型。
  • 双精度的浮点数据使用 DOUBLE 类型。
  • 存储金融数据(例如:商品价格)的时候建议使用 DOUBLE 或者 DECIMALDECIMAL 的取值范围更大一些。
(4)布尔类型:主要就是 BOOLEAN 了。
(5)空值NULL
(6)日期时间:包括:DATETIMETIMESTAMPTIMESTAMP_LTZINTERVALINTERVAL YEAR TO MONTHINTERVAL DAY TO SECOND)。
  • DATE:返回的是“年-月-日”格式的不带时区的日期类型。
  • TIME:返回的是“小时:分钟:秒”格式的不带时区的数据类型。
  • TIMESTAMP:返回的是“年-月-日 小时:分钟:秒”格式的不带时区的时间类型。
  • TIMESTAMP_LTZ:返回的是“年-月-日 小时:分钟:秒”格式的带本地时区的时间类型。
    • TIMESTAMP_LTZ 的时区信息是由 Flink SQL 任务的全局配置决定的,可以通过 table.local-time-zone 参数来设置时区,如果不设置程序默认读取本地电脑的时区。
  • INTERVAL 格式涉及的种类比较多:例如:INTERVAL YEAR TO MONTHINTERVAL DAY TO SECOND 等。
    • INTERVAL 主要是应用在 SELECT 语句中,用于给 TIMESTAMPTIMESTAMP_LTZ 类型的时间添加偏移量的。
    • 例如:给 TIMESTAMP 加、减几天、几个月或者几年。

3,复合数据类型

(1)数组ARRAY,类似于 Hive 中的 ARRAY
(2)键值MAP,类似于 Hive 中的 Map 一样。
(3)集合MULTISET,类似于 Java 中的 List,允许重复的数据。
(4)对象ROW,类似于 Java 中的自定义对象。

附:日期时间使用详解

1,不同日期格式的区别

(1)下面样例使用 Scala 语言对比 DATETIMETIMESTAMPTIMESTAMP_LTZ 之间的区别,以及 INTERVAL 的使用。
注意:
  • 建议在代码中手工设置时区,如果不设置,程序会默认读取所在机器的时区信息,这样可能会返回 UTC 时区的数据,比国内时间少 8 个小时,所以一般建议手工设置国内的时区。
  • 建议将代码并行度设置为 1,这样便于观察输出数据的情况。
  • TIMESTAMP(3):里面的参数 3 表示保留 3 位,也就是精确到毫秒级别,如果不指定默认是保留 6 位,最多支持到 9 位,但是常见的都是保留 3 位精确到毫秒级别就行了。
import java.time.ZoneId
import org.apache.flink.configuration.CoreOptions
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

object DateTimeSQL {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val settings = EnvironmentSettings
      .newInstance()
      //指定执行模式,支持inBatchMode和inStreamingMode
      .inStreamingMode()
      .build()
    val tEnv = TableEnvironment.create(settings)

    //指定国内的时区
    tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))

    //设置全局并行度为1
    tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM.key(),"1")

    val inTableSql =
      """
        |CREATE TABLE date_table (
        |  d_date DATE,
        |  d_time TIME,
        |  d_timestamp TIMESTAMP,
        |  d_timestamp_ltz TIMESTAMP_LTZ(3)
        |) WITH (
        |  'connector' = 'datagen',
        |  'rows-per-second' = '1'
        |)
        |""".stripMargin
    tEnv.executeSql(inTableSql)

    //查看输入表中的数据
    val querySQL =
      """
        |SELECT
        |  d_date,
        |  d_time,
        |  d_timestamp,
        |  d_timestamp_ltz,
        |  d_timestamp_ltz + INTERVAL '1' SECOND AS d_timestamp_ltz_second,
        |  d_timestamp_ltz + INTERVAL '1' MINUTE AS d_timestamp_ltz_minute,
        |  d_timestamp_ltz + INTERVAL '1' HOUR AS d_timestamp_ltz_hour,
        |  d_timestamp_ltz + INTERVAL '1' DAY AS d_timestamp_ltz_day,
        |  d_timestamp_ltz + INTERVAL '1 02' DAY TO HOUR   AS d_timestamp_ltz_day_to_hour,
        |  d_timestamp_ltz + INTERVAL '02:03' HOUR TO MINUTE   AS d_timestamp_ltz_hour_to_minute
        |FROM date_table
        |""".stripMargin
    tEnv.executeSql(querySQL).print()
  }
}
import java.time.ZoneId;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class DateTimeSQLJava {
    public static void main(String[] args) {
        // 创建执行环境
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                // 指定执行模式,支持 inBatchMode 和 inStreamingMode
                .inStreamingMode()
                .build();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        // 指定国内的时区
        tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));

        // 设置全局并行度为1
        Configuration configuration = tEnv.getConfig().getConfiguration();
        configuration.setString(CoreOptions.DEFAULT_PARALLELISM.key(), "1");

        String inTableSql =
                "CREATE TABLE date_table (\n" +
                        "  d_date DATE,\n" +
                        "  d_time TIME,\n" +
                        "  d_timestamp TIMESTAMP,\n" +
                        "  d_timestamp_ltz TIMESTAMP_LTZ(3)\n" +
                        ") WITH (\n" +
                        "  'connector' = 'datagen',\n" +
                        "  'rows-per-second' = '1'\n" +
                        ")";
        tEnv.executeSql(inTableSql);

        // 查看输入表中的数据
        String querySQL =
                "SELECT\n" +
                        "  d_date,\n" +
                        "  d_time,\n" +
                        "  d_timestamp,\n" +
                        "  d_timestamp_ltz,\n" +
                        "  d_timestamp_ltz + INTERVAL '1' SECOND AS d_timestamp_ltz_second,\n" +
                        "  d_timestamp_ltz + INTERVAL '1' MINUTE AS d_timestamp_ltz_minute,\n" +
                        "  d_timestamp_ltz + INTERVAL '1' HOUR AS d_timestamp_ltz_hour,\n" +
                        "  d_timestamp_ltz + INTERVAL '1' DAY AS d_timestamp_ltz_day,\n" +
                        "  d_timestamp_ltz + INTERVAL '1 02' DAY TO HOUR" +
                        "      AS d_timestamp_ltz_day_to_hour,\n" +
                        "  d_timestamp_ltz + INTERVAL '02:03' HOUR TO MINUTE" +
                        "      AS d_timestamp_ltz_hour_to_minute\n" +
                        "FROM date_table";
        tEnv.executeSql(querySQL).print();
    }
}

(2)执行代码,运行结果如下:

2,各种日期函数的使用

(1)下面使用 Scala 语言演示几个相关的日期函数的使用:
注意:这里面列出来的这些日期函数都是和时区有关系的,只要设置了国内时区,那么返回的数据格式都是国内的时间。
import java.time.ZoneId
import org.apache.flink.configuration.CoreOptions
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

object DateTimeFuncSQL {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val settings = EnvironmentSettings
      .newInstance()
      //指定执行模式,支持inBatchMode和inStreamingMode
      .inStreamingMode()
      .build()
    val tEnv = TableEnvironment.create(settings)

    //指定国内的时区
    tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))

    //设置全局并行度为1
    tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM.key(),"1")
    
    val querySQL =
      """
        |SELECT
        |CURRENT_DATE,
        |LOCALTIME,
        |CURRENT_TIME,
        |LOCALTIMESTAMP,
        |CURRENT_TIMESTAMP,
        |CURRENT_ROW_TIMESTAMP(),
        |NOW(),
        |PROCTIME(),
        |DATE_FORMAT(CURRENT_TIMESTAMP,'yyyy-MM-dd HH:mm:ss')
        |""".stripMargin

    tEnv.executeSql(querySQL).print()
  }
}
import java.time.ZoneId;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class DateTimeFuncSQLJava {
    public static void main(String[] args) {
        // 创建执行环境
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                // 指定执行模式,支持 inBatchMode 和 inStreamingMode
                .inStreamingMode()
                .build();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        // 指定国内的时区
        tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));

        // 设置全局并行度为1
        tEnv.getConfig().set(CoreOptions.DEFAULT_PARALLELISM.key(), "1");

        String querySQL =
                "SELECT " +
                        "CURRENT_DATE, " +
                        "LOCALTIME, " +
                        "CURRENT_TIME, " +
                        "LOCALTIMESTAMP, " +
                        "CURRENT_TIMESTAMP, " +
                        "CURRENT_ROW_TIMESTAMP(), " +
                        "NOW(), " +
                        "PROCTIME(), " +
                        "DATE_FORMAT(CURRENT_TIMESTAMP,'yyyy-MM-dd HH:mm:ss')";

        tEnv.executeSql(querySQL).print();
    }
}

(2)执行代码,运行结果如下:

3,CURRENT_TIMESTAMP 和 CURRENT_ROW_TIMESTAM() 对比

(1)CURRENT_TIMESTAMP CURRENT_ROW_TIMESTAM() 之间是有一些区别的:
  • CURRENT_TIMESTAMP:在批处理模式下,它会在查询开始时生成一个时间,给这一批的离线数据返回同一个时间。在流处理模式下,他会针对每条数据分别生成一个时间。
  • CURRENT_ROW_TIMESTAMP():无论在批处理模式下,还是在流处理模式下,效果都是一样的,他都会对每条数据分别生成时间。

(2)想要验证这个效果,需要指定一个在批处理模式和流处理模式下都可以使用的 Source 组件,这样便于对比分析:在这里我们使用 filesystem
  • 我们在 HDFS 中添加一个 student.json 文件,文件内容如下:
[
  {"name":"Hangge","age":19},
  {"name":"Jack","age":16},
  {"name":"Jessic","age":19},
  {"name":"Mick","age":20},
  {"name":"Joy","age":19},
  {"name":"Say","age":20},
  {"name":"Kahu","age":19},
  {"name":"Hart","age":16}
]
{"name":"Hangge","age":19}
{"name":"Jack","age":16}
{"name":"Jessic","age":19}
{"name":"Mick","age":20}
{"name":"Joy","age":19}
{"name":"Say","age":20}
{"name":"Kahu","age":19}
{"name":"Hart","age":16}

(3)首先是测试在流处理模式下执行情况,下面是 Scala 语言代码:
import java.time.ZoneId
import org.apache.flink.configuration.CoreOptions
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

object DateTimeFuncSQL {

  def main(args: Array[String]): Unit = {
    //创建执行环境
    val settings = EnvironmentSettings
      .newInstance()
      //指定执行模式,支持inBatchMode和inStreamingMode
      .inStreamingMode()
      .build()
    val tEnv = TableEnvironment.create(settings)

    //指定国内的时区
    tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))

    //设置全局并行度为1
    tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM.key(),"1")

    //创建输入表
    val inTableSql =
      """
        |CREATE TABLE file_source(
        |  name STRING,
        |  age INT
        |)WITH(
        |  'connector' = 'filesystem',
        |  'path' = 'hdfs://192.168.121.128:9000/student.json',
        |  'format' = 'json',
        |  'json.fail-on-missing-field' = 'false',
        |  'json.ignore-parse-errors' = 'true'
        |)
        |""".stripMargin
    tEnv.executeSql(inTableSql)

    val querySQL =
      """
        |SELECT
        |name,
        |age,
        |CURRENT_TIMESTAMP,
        |CURRENT_ROW_TIMESTAMP()
        |FROM file_source
        |""".stripMargin

    tEnv.executeSql(querySQL).print()
  }
}
import java.time.ZoneId;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class DateTimeFuncSQLJava {
    public static void main(String[] args) {
        // 创建执行环境
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                // 指定执行模式,支持 inBatchMode 和 inStreamingMode
                .inStreamingMode()
                .build();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        // 指定国内的时区
        tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));

        // 设置全局并行度为1
        tEnv.getConfig().set(CoreOptions.DEFAULT_PARALLELISM.key(), "1");

        // 创建输入表
        String inTableSql =
                "CREATE TABLE file_source( " +
                        "  name STRING, " +
                        "  age INT " +
                        ") WITH ( " +
                        "  'connector' = 'filesystem', " +
                        "  'path' = 'hdfs://192.168.121.128:9000/student.json', " +
                        "  'format' = 'json', " +
                        "  'json.fail-on-missing-field' = 'false', " +
                        "  'json.ignore-parse-errors' = 'true' " +
                        ")";
        tEnv.executeSql(inTableSql);

        // 查询数据
        String querySQL =
                "SELECT " +
                        "name, " +
                        "age, " +
                        "CURRENT_TIMESTAMP, " +
                        "CURRENT_ROW_TIMESTAMP() " +
                        "FROM file_source";
        tEnv.executeSql(querySQL).print();
    }
}
注意:可能会存在某几条数据的时间戳是一样的,属于正常现象,说明程序处理的比较快

(4)修改代码,我们改成批处理模式执行:
//创建执行环境
val settings = EnvironmentSettings
  .newInstance()
  //指定执行模式,支持inBatchMode和inStreamingMode
  .inBatchMode()
  .build()
val tEnv = TableEnvironment.create(settings)
评论

全部评论(0)

回到顶部