Flink SQL - 常见数据类型详解(附:各种日期格式、日期函数使用样例)
作者:hangge | 2025-04-21 09:04
一、数据类型详解
1,基本介绍
(1)Flink SQL 中支持的数据类型大致可以分为三类:
- 基础数据类型
- 复合数据类型
- 自定义数据类型
(2)在工作中,基础数据类型和复合数据类型其实就足够使用了。
2,基础数据类型
(1)字符串:包括:CHAR、VARCHAR、STRING,一般直接用 STRING 即可。
(2)二进制:包括:BINARY、VARBINARY、BYTES。
(3)数值类型:包括 DECIMAL、NUMERIC、TINYINT、SMALLINT、INT \ INTEGER、BIGINT、FLOAT、DOUBLE。
- 整数一般使用 INT 或者 BIGINT,INT 和 BIGINT 的取值范围和 Java 中的 int 和 long 的取值范围一样。
- 单精度的浮点数据使用 FLOAT 类型。
- 双精度的浮点数据使用 DOUBLE 类型。
- 存储金融数据(例如:商品价格)的时候建议使用 DOUBLE 或者 DECIMAL,DECIMAL 的取值范围更大一些。
(4)布尔类型:主要就是 BOOLEAN 了。
(5)空值:NULL。
(6)日期时间:包括:DATE、TIME、TIMESTAMP、TIMESTAMP_LTZ、INTERVAL(INTERVAL YEAR TO MONTH、 INTERVAL DAY TO SECOND)。
- DATE:返回的是“年-月-日”格式的不带时区的日期类型。
- TIME:返回的是“小时:分钟:秒”格式的不带时区的数据类型。
- TIMESTAMP:返回的是“年-月-日 小时:分钟:秒”格式的不带时区的时间类型。
- TIMESTAMP_LTZ:返回的是“年-月-日 小时:分钟:秒”格式的带本地时区的时间类型。
- TIMESTAMP_LTZ 的时区信息是由 Flink SQL 任务的全局配置决定的,可以通过 table.local-time-zone 参数来设置时区,如果不设置程序默认读取本地电脑的时区。
- INTERVAL 格式涉及的种类比较多:例如:INTERVAL YEAR TO MONTH、 INTERVAL DAY TO SECOND 等。
- INTERVAL 主要是应用在 SELECT 语句中,用于给 TIMESTAMP、TIMESTAMP_LTZ 类型的时间添加偏移量的。
- 例如:给 TIMESTAMP 加、减几天、几个月或者几年。
3,复合数据类型
(1)数组:ARRAY,类似于 Hive 中的 ARRAY。
(2)键值:MAP,类似于 Hive 中的 Map 一样。
(3)集合:MULTISET,类似于 Java 中的 List,允许重复的数据。
(4)对象:ROW,类似于 Java 中的自定义对象。
附:日期时间使用详解
1,不同日期格式的区别
(1)下面样例使用 Scala 语言对比 DATE、TIME、TIMESTAMP、TIMESTAMP_LTZ 之间的区别,以及 INTERVAL 的使用。
注意:
- 建议在代码中手工设置时区,如果不设置,程序会默认读取所在机器的时区信息,这样可能会返回 UTC 时区的数据,比国内时间少 8 个小时,所以一般建议手工设置国内的时区。
- 建议将代码并行度设置为 1,这样便于观察输出数据的情况。
- TIMESTAMP(3):里面的参数 3 表示保留 3 位,也就是精确到毫秒级别,如果不指定默认是保留 6 位,最多支持到 9 位,但是常见的都是保留 3 位精确到毫秒级别就行了。
import java.time.ZoneId import org.apache.flink.configuration.CoreOptions import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment} object DateTimeSQL { def main(args: Array[String]): Unit = { //创建执行环境 val settings = EnvironmentSettings .newInstance() //指定执行模式,支持inBatchMode和inStreamingMode .inStreamingMode() .build() val tEnv = TableEnvironment.create(settings) //指定国内的时区 tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai")) //设置全局并行度为1 tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM.key(),"1") val inTableSql = """ |CREATE TABLE date_table ( | d_date DATE, | d_time TIME, | d_timestamp TIMESTAMP, | d_timestamp_ltz TIMESTAMP_LTZ(3) |) WITH ( | 'connector' = 'datagen', | 'rows-per-second' = '1' |) |""".stripMargin tEnv.executeSql(inTableSql) //查看输入表中的数据 val querySQL = """ |SELECT | d_date, | d_time, | d_timestamp, | d_timestamp_ltz, | d_timestamp_ltz + INTERVAL '1' SECOND AS d_timestamp_ltz_second, | d_timestamp_ltz + INTERVAL '1' MINUTE AS d_timestamp_ltz_minute, | d_timestamp_ltz + INTERVAL '1' HOUR AS d_timestamp_ltz_hour, | d_timestamp_ltz + INTERVAL '1' DAY AS d_timestamp_ltz_day, | d_timestamp_ltz + INTERVAL '1 02' DAY TO HOUR AS d_timestamp_ltz_day_to_hour, | d_timestamp_ltz + INTERVAL '02:03' HOUR TO MINUTE AS d_timestamp_ltz_hour_to_minute |FROM date_table |""".stripMargin tEnv.executeSql(querySQL).print() } }
- 下面是使用 Java 语言实现同样功能:
import java.time.ZoneId; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; public class DateTimeSQLJava { public static void main(String[] args) { // 创建执行环境 EnvironmentSettings settings = EnvironmentSettings .newInstance() // 指定执行模式,支持 inBatchMode 和 inStreamingMode .inStreamingMode() .build(); TableEnvironment tEnv = TableEnvironment.create(settings); // 指定国内的时区 tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")); // 设置全局并行度为1 Configuration configuration = tEnv.getConfig().getConfiguration(); configuration.setString(CoreOptions.DEFAULT_PARALLELISM.key(), "1"); String inTableSql = "CREATE TABLE date_table (\n" + " d_date DATE,\n" + " d_time TIME,\n" + " d_timestamp TIMESTAMP,\n" + " d_timestamp_ltz TIMESTAMP_LTZ(3)\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' = '1'\n" + ")"; tEnv.executeSql(inTableSql); // 查看输入表中的数据 String querySQL = "SELECT\n" + " d_date,\n" + " d_time,\n" + " d_timestamp,\n" + " d_timestamp_ltz,\n" + " d_timestamp_ltz + INTERVAL '1' SECOND AS d_timestamp_ltz_second,\n" + " d_timestamp_ltz + INTERVAL '1' MINUTE AS d_timestamp_ltz_minute,\n" + " d_timestamp_ltz + INTERVAL '1' HOUR AS d_timestamp_ltz_hour,\n" + " d_timestamp_ltz + INTERVAL '1' DAY AS d_timestamp_ltz_day,\n" + " d_timestamp_ltz + INTERVAL '1 02' DAY TO HOUR" + " AS d_timestamp_ltz_day_to_hour,\n" + " d_timestamp_ltz + INTERVAL '02:03' HOUR TO MINUTE" + " AS d_timestamp_ltz_hour_to_minute\n" + "FROM date_table"; tEnv.executeSql(querySQL).print(); } }
(2)执行代码,运行结果如下:

2,各种日期函数的使用
(1)下面使用 Scala 语言演示几个相关的日期函数的使用:
注意:这里面列出来的这些日期函数都是和时区有关系的,只要设置了国内时区,那么返回的数据格式都是国内的时间。
import java.time.ZoneId import org.apache.flink.configuration.CoreOptions import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment} object DateTimeFuncSQL { def main(args: Array[String]): Unit = { //创建执行环境 val settings = EnvironmentSettings .newInstance() //指定执行模式,支持inBatchMode和inStreamingMode .inStreamingMode() .build() val tEnv = TableEnvironment.create(settings) //指定国内的时区 tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai")) //设置全局并行度为1 tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM.key(),"1") val querySQL = """ |SELECT |CURRENT_DATE, |LOCALTIME, |CURRENT_TIME, |LOCALTIMESTAMP, |CURRENT_TIMESTAMP, |CURRENT_ROW_TIMESTAMP(), |NOW(), |PROCTIME(), |DATE_FORMAT(CURRENT_TIMESTAMP,'yyyy-MM-dd HH:mm:ss') |""".stripMargin tEnv.executeSql(querySQL).print() } }
- 下面是使用 Java 语言实现同样功能:
import java.time.ZoneId; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; public class DateTimeFuncSQLJava { public static void main(String[] args) { // 创建执行环境 EnvironmentSettings settings = EnvironmentSettings .newInstance() // 指定执行模式,支持 inBatchMode 和 inStreamingMode .inStreamingMode() .build(); TableEnvironment tEnv = TableEnvironment.create(settings); // 指定国内的时区 tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")); // 设置全局并行度为1 tEnv.getConfig().set(CoreOptions.DEFAULT_PARALLELISM.key(), "1"); String querySQL = "SELECT " + "CURRENT_DATE, " + "LOCALTIME, " + "CURRENT_TIME, " + "LOCALTIMESTAMP, " + "CURRENT_TIMESTAMP, " + "CURRENT_ROW_TIMESTAMP(), " + "NOW(), " + "PROCTIME(), " + "DATE_FORMAT(CURRENT_TIMESTAMP,'yyyy-MM-dd HH:mm:ss')"; tEnv.executeSql(querySQL).print(); } }
(2)执行代码,运行结果如下:

3,CURRENT_TIMESTAMP 和 CURRENT_ROW_TIMESTAM() 对比
(1)CURRENT_TIMESTAMP 和 CURRENT_ROW_TIMESTAM() 之间是有一些区别的:
- CURRENT_TIMESTAMP:在批处理模式下,它会在查询开始时生成一个时间,给这一批的离线数据返回同一个时间。在流处理模式下,他会针对每条数据分别生成一个时间。
- CURRENT_ROW_TIMESTAMP():无论在批处理模式下,还是在流处理模式下,效果都是一样的,他都会对每条数据分别生成时间。
(2)想要验证这个效果,需要指定一个在批处理模式和流处理模式下都可以使用的 Source 组件,这样便于对比分析:在这里我们使用 filesystem。
- 我们在 HDFS 中添加一个 student.json 文件,文件内容如下:
[ {"name":"Hangge","age":19}, {"name":"Jack","age":16}, {"name":"Jessic","age":19}, {"name":"Mick","age":20}, {"name":"Joy","age":19}, {"name":"Say","age":20}, {"name":"Kahu","age":19}, {"name":"Hart","age":16} ]
- 其实, 文件内容写成如下形式也是可以的:
{"name":"Hangge","age":19} {"name":"Jack","age":16} {"name":"Jessic","age":19} {"name":"Mick","age":20} {"name":"Joy","age":19} {"name":"Say","age":20} {"name":"Kahu","age":19} {"name":"Hart","age":16}
(3)首先是测试在流处理模式下执行情况,下面是 Scala 语言代码:
import java.time.ZoneId import org.apache.flink.configuration.CoreOptions import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment} object DateTimeFuncSQL { def main(args: Array[String]): Unit = { //创建执行环境 val settings = EnvironmentSettings .newInstance() //指定执行模式,支持inBatchMode和inStreamingMode .inStreamingMode() .build() val tEnv = TableEnvironment.create(settings) //指定国内的时区 tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai")) //设置全局并行度为1 tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM.key(),"1") //创建输入表 val inTableSql = """ |CREATE TABLE file_source( | name STRING, | age INT |)WITH( | 'connector' = 'filesystem', | 'path' = 'hdfs://192.168.121.128:9000/student.json', | 'format' = 'json', | 'json.fail-on-missing-field' = 'false', | 'json.ignore-parse-errors' = 'true' |) |""".stripMargin tEnv.executeSql(inTableSql) val querySQL = """ |SELECT |name, |age, |CURRENT_TIMESTAMP, |CURRENT_ROW_TIMESTAMP() |FROM file_source |""".stripMargin tEnv.executeSql(querySQL).print() } }
- 下面是使用 Java 语言实现同样功能:
import java.time.ZoneId; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; public class DateTimeFuncSQLJava { public static void main(String[] args) { // 创建执行环境 EnvironmentSettings settings = EnvironmentSettings .newInstance() // 指定执行模式,支持 inBatchMode 和 inStreamingMode .inStreamingMode() .build(); TableEnvironment tEnv = TableEnvironment.create(settings); // 指定国内的时区 tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")); // 设置全局并行度为1 tEnv.getConfig().set(CoreOptions.DEFAULT_PARALLELISM.key(), "1"); // 创建输入表 String inTableSql = "CREATE TABLE file_source( " + " name STRING, " + " age INT " + ") WITH ( " + " 'connector' = 'filesystem', " + " 'path' = 'hdfs://192.168.121.128:9000/student.json', " + " 'format' = 'json', " + " 'json.fail-on-missing-field' = 'false', " + " 'json.ignore-parse-errors' = 'true' " + ")"; tEnv.executeSql(inTableSql); // 查询数据 String querySQL = "SELECT " + "name, " + "age, " + "CURRENT_TIMESTAMP, " + "CURRENT_ROW_TIMESTAMP() " + "FROM file_source"; tEnv.executeSql(querySQL).print(); } }
- 执行程序,可以看到控制台输出内容如下。从这个结果中可以看出,CURRENT_TIMESTAMP 和 CURRENT_ROW_TIMESTAM() 的效果是一样的,他们返回的这些数据的时间戳是不一样的。
注意:可能会存在某几条数据的时间戳是一样的,属于正常现象,说明程序处理的比较快

(4)修改代码,我们改成批处理模式执行:
//创建执行环境 val settings = EnvironmentSettings .newInstance() //指定执行模式,支持inBatchMode和inStreamingMode .inBatchMode() .build() val tEnv = TableEnvironment.create(settings)
- 执行程序,可以看到控制台输出内容如下。从这个结果中可以看出,CURRENT_TIMESTAM 返回的时间戳都是一样的。而 CURRENT_ROW_TIMESTAM() 返回的时间戳是不一样的。

全部评论(0)