Flink SQL - 常见数据类型详解(附:各种日期格式、日期函数使用样例)
作者:hangge | 2025-04-21 09:04
一、数据类型详解
1,基本介绍
(1)Flink SQL 中支持的数据类型大致可以分为三类:
- 基础数据类型
- 复合数据类型
- 自定义数据类型
(2)在工作中,基础数据类型和复合数据类型其实就足够使用了。
2,基础数据类型
(1)字符串:包括:CHAR、VARCHAR、STRING,一般直接用 STRING 即可。
(2)二进制:包括:BINARY、VARBINARY、BYTES。
(3)数值类型:包括 DECIMAL、NUMERIC、TINYINT、SMALLINT、INT \ INTEGER、BIGINT、FLOAT、DOUBLE。
- 整数一般使用 INT 或者 BIGINT,INT 和 BIGINT 的取值范围和 Java 中的 int 和 long 的取值范围一样。
- 单精度的浮点数据使用 FLOAT 类型。
- 双精度的浮点数据使用 DOUBLE 类型。
- 存储金融数据(例如:商品价格)的时候建议使用 DOUBLE 或者 DECIMAL,DECIMAL 的取值范围更大一些。
(4)布尔类型:主要就是 BOOLEAN 了。
(5)空值:NULL。
(6)日期时间:包括:DATE、TIME、TIMESTAMP、TIMESTAMP_LTZ、INTERVAL(INTERVAL YEAR TO MONTH、 INTERVAL DAY TO SECOND)。
- DATE:返回的是“年-月-日”格式的不带时区的日期类型。
- TIME:返回的是“小时:分钟:秒”格式的不带时区的数据类型。
- TIMESTAMP:返回的是“年-月-日 小时:分钟:秒”格式的不带时区的时间类型。
- TIMESTAMP_LTZ:返回的是“年-月-日 小时:分钟:秒”格式的带本地时区的时间类型。
- TIMESTAMP_LTZ 的时区信息是由 Flink SQL 任务的全局配置决定的,可以通过 table.local-time-zone 参数来设置时区,如果不设置程序默认读取本地电脑的时区。
- INTERVAL 格式涉及的种类比较多:例如:INTERVAL YEAR TO MONTH、 INTERVAL DAY TO SECOND 等。
- INTERVAL 主要是应用在 SELECT 语句中,用于给 TIMESTAMP、TIMESTAMP_LTZ 类型的时间添加偏移量的。
- 例如:给 TIMESTAMP 加、减几天、几个月或者几年。
3,复合数据类型
(1)数组:ARRAY,类似于 Hive 中的 ARRAY。
(2)键值:MAP,类似于 Hive 中的 Map 一样。
(3)集合:MULTISET,类似于 Java 中的 List,允许重复的数据。
(4)对象:ROW,类似于 Java 中的自定义对象。
附:日期时间使用详解
1,不同日期格式的区别
(1)下面样例使用 Scala 语言对比 DATE、TIME、TIMESTAMP、TIMESTAMP_LTZ 之间的区别,以及 INTERVAL 的使用。
注意:
- 建议在代码中手工设置时区,如果不设置,程序会默认读取所在机器的时区信息,这样可能会返回 UTC 时区的数据,比国内时间少 8 个小时,所以一般建议手工设置国内的时区。
- 建议将代码并行度设置为 1,这样便于观察输出数据的情况。
- TIMESTAMP(3):里面的参数 3 表示保留 3 位,也就是精确到毫秒级别,如果不指定默认是保留 6 位,最多支持到 9 位,但是常见的都是保留 3 位精确到毫秒级别就行了。
import java.time.ZoneId
import org.apache.flink.configuration.CoreOptions
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
object DateTimeSQL {
def main(args: Array[String]): Unit = {
//创建执行环境
val settings = EnvironmentSettings
.newInstance()
//指定执行模式,支持inBatchMode和inStreamingMode
.inStreamingMode()
.build()
val tEnv = TableEnvironment.create(settings)
//指定国内的时区
tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
//设置全局并行度为1
tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM.key(),"1")
val inTableSql =
"""
|CREATE TABLE date_table (
| d_date DATE,
| d_time TIME,
| d_timestamp TIMESTAMP,
| d_timestamp_ltz TIMESTAMP_LTZ(3)
|) WITH (
| 'connector' = 'datagen',
| 'rows-per-second' = '1'
|)
|""".stripMargin
tEnv.executeSql(inTableSql)
//查看输入表中的数据
val querySQL =
"""
|SELECT
| d_date,
| d_time,
| d_timestamp,
| d_timestamp_ltz,
| d_timestamp_ltz + INTERVAL '1' SECOND AS d_timestamp_ltz_second,
| d_timestamp_ltz + INTERVAL '1' MINUTE AS d_timestamp_ltz_minute,
| d_timestamp_ltz + INTERVAL '1' HOUR AS d_timestamp_ltz_hour,
| d_timestamp_ltz + INTERVAL '1' DAY AS d_timestamp_ltz_day,
| d_timestamp_ltz + INTERVAL '1 02' DAY TO HOUR AS d_timestamp_ltz_day_to_hour,
| d_timestamp_ltz + INTERVAL '02:03' HOUR TO MINUTE AS d_timestamp_ltz_hour_to_minute
|FROM date_table
|""".stripMargin
tEnv.executeSql(querySQL).print()
}
}
- 下面是使用 Java 语言实现同样功能:
import java.time.ZoneId;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
public class DateTimeSQLJava {
public static void main(String[] args) {
// 创建执行环境
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
// 指定执行模式,支持 inBatchMode 和 inStreamingMode
.inStreamingMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
// 指定国内的时区
tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
// 设置全局并行度为1
Configuration configuration = tEnv.getConfig().getConfiguration();
configuration.setString(CoreOptions.DEFAULT_PARALLELISM.key(), "1");
String inTableSql =
"CREATE TABLE date_table (\n" +
" d_date DATE,\n" +
" d_time TIME,\n" +
" d_timestamp TIMESTAMP,\n" +
" d_timestamp_ltz TIMESTAMP_LTZ(3)\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second' = '1'\n" +
")";
tEnv.executeSql(inTableSql);
// 查看输入表中的数据
String querySQL =
"SELECT\n" +
" d_date,\n" +
" d_time,\n" +
" d_timestamp,\n" +
" d_timestamp_ltz,\n" +
" d_timestamp_ltz + INTERVAL '1' SECOND AS d_timestamp_ltz_second,\n" +
" d_timestamp_ltz + INTERVAL '1' MINUTE AS d_timestamp_ltz_minute,\n" +
" d_timestamp_ltz + INTERVAL '1' HOUR AS d_timestamp_ltz_hour,\n" +
" d_timestamp_ltz + INTERVAL '1' DAY AS d_timestamp_ltz_day,\n" +
" d_timestamp_ltz + INTERVAL '1 02' DAY TO HOUR" +
" AS d_timestamp_ltz_day_to_hour,\n" +
" d_timestamp_ltz + INTERVAL '02:03' HOUR TO MINUTE" +
" AS d_timestamp_ltz_hour_to_minute\n" +
"FROM date_table";
tEnv.executeSql(querySQL).print();
}
}
(2)执行代码,运行结果如下:

2,各种日期函数的使用
(1)下面使用 Scala 语言演示几个相关的日期函数的使用:
注意:这里面列出来的这些日期函数都是和时区有关系的,只要设置了国内时区,那么返回的数据格式都是国内的时间。
import java.time.ZoneId
import org.apache.flink.configuration.CoreOptions
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
object DateTimeFuncSQL {
def main(args: Array[String]): Unit = {
//创建执行环境
val settings = EnvironmentSettings
.newInstance()
//指定执行模式,支持inBatchMode和inStreamingMode
.inStreamingMode()
.build()
val tEnv = TableEnvironment.create(settings)
//指定国内的时区
tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
//设置全局并行度为1
tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM.key(),"1")
val querySQL =
"""
|SELECT
|CURRENT_DATE,
|LOCALTIME,
|CURRENT_TIME,
|LOCALTIMESTAMP,
|CURRENT_TIMESTAMP,
|CURRENT_ROW_TIMESTAMP(),
|NOW(),
|PROCTIME(),
|DATE_FORMAT(CURRENT_TIMESTAMP,'yyyy-MM-dd HH:mm:ss')
|""".stripMargin
tEnv.executeSql(querySQL).print()
}
}
- 下面是使用 Java 语言实现同样功能:
import java.time.ZoneId;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
public class DateTimeFuncSQLJava {
public static void main(String[] args) {
// 创建执行环境
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
// 指定执行模式,支持 inBatchMode 和 inStreamingMode
.inStreamingMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
// 指定国内的时区
tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
// 设置全局并行度为1
tEnv.getConfig().set(CoreOptions.DEFAULT_PARALLELISM.key(), "1");
String querySQL =
"SELECT " +
"CURRENT_DATE, " +
"LOCALTIME, " +
"CURRENT_TIME, " +
"LOCALTIMESTAMP, " +
"CURRENT_TIMESTAMP, " +
"CURRENT_ROW_TIMESTAMP(), " +
"NOW(), " +
"PROCTIME(), " +
"DATE_FORMAT(CURRENT_TIMESTAMP,'yyyy-MM-dd HH:mm:ss')";
tEnv.executeSql(querySQL).print();
}
}
(2)执行代码,运行结果如下:

3,CURRENT_TIMESTAMP 和 CURRENT_ROW_TIMESTAM() 对比
(1)CURRENT_TIMESTAMP 和 CURRENT_ROW_TIMESTAM() 之间是有一些区别的:
- CURRENT_TIMESTAMP:在批处理模式下,它会在查询开始时生成一个时间,给这一批的离线数据返回同一个时间。在流处理模式下,他会针对每条数据分别生成一个时间。
- CURRENT_ROW_TIMESTAMP():无论在批处理模式下,还是在流处理模式下,效果都是一样的,他都会对每条数据分别生成时间。
(2)想要验证这个效果,需要指定一个在批处理模式和流处理模式下都可以使用的 Source 组件,这样便于对比分析:在这里我们使用 filesystem。
- 我们在 HDFS 中添加一个 student.json 文件,文件内容如下:
[
{"name":"Hangge","age":19},
{"name":"Jack","age":16},
{"name":"Jessic","age":19},
{"name":"Mick","age":20},
{"name":"Joy","age":19},
{"name":"Say","age":20},
{"name":"Kahu","age":19},
{"name":"Hart","age":16}
]
- 其实, 文件内容写成如下形式也是可以的:
{"name":"Hangge","age":19}
{"name":"Jack","age":16}
{"name":"Jessic","age":19}
{"name":"Mick","age":20}
{"name":"Joy","age":19}
{"name":"Say","age":20}
{"name":"Kahu","age":19}
{"name":"Hart","age":16}
(3)首先是测试在流处理模式下执行情况,下面是 Scala 语言代码:
import java.time.ZoneId
import org.apache.flink.configuration.CoreOptions
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
object DateTimeFuncSQL {
def main(args: Array[String]): Unit = {
//创建执行环境
val settings = EnvironmentSettings
.newInstance()
//指定执行模式,支持inBatchMode和inStreamingMode
.inStreamingMode()
.build()
val tEnv = TableEnvironment.create(settings)
//指定国内的时区
tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
//设置全局并行度为1
tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM.key(),"1")
//创建输入表
val inTableSql =
"""
|CREATE TABLE file_source(
| name STRING,
| age INT
|)WITH(
| 'connector' = 'filesystem',
| 'path' = 'hdfs://192.168.121.128:9000/student.json',
| 'format' = 'json',
| 'json.fail-on-missing-field' = 'false',
| 'json.ignore-parse-errors' = 'true'
|)
|""".stripMargin
tEnv.executeSql(inTableSql)
val querySQL =
"""
|SELECT
|name,
|age,
|CURRENT_TIMESTAMP,
|CURRENT_ROW_TIMESTAMP()
|FROM file_source
|""".stripMargin
tEnv.executeSql(querySQL).print()
}
}
- 下面是使用 Java 语言实现同样功能:
import java.time.ZoneId;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
public class DateTimeFuncSQLJava {
public static void main(String[] args) {
// 创建执行环境
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
// 指定执行模式,支持 inBatchMode 和 inStreamingMode
.inStreamingMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
// 指定国内的时区
tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
// 设置全局并行度为1
tEnv.getConfig().set(CoreOptions.DEFAULT_PARALLELISM.key(), "1");
// 创建输入表
String inTableSql =
"CREATE TABLE file_source( " +
" name STRING, " +
" age INT " +
") WITH ( " +
" 'connector' = 'filesystem', " +
" 'path' = 'hdfs://192.168.121.128:9000/student.json', " +
" 'format' = 'json', " +
" 'json.fail-on-missing-field' = 'false', " +
" 'json.ignore-parse-errors' = 'true' " +
")";
tEnv.executeSql(inTableSql);
// 查询数据
String querySQL =
"SELECT " +
"name, " +
"age, " +
"CURRENT_TIMESTAMP, " +
"CURRENT_ROW_TIMESTAMP() " +
"FROM file_source";
tEnv.executeSql(querySQL).print();
}
}
- 执行程序,可以看到控制台输出内容如下。从这个结果中可以看出,CURRENT_TIMESTAMP 和 CURRENT_ROW_TIMESTAM() 的效果是一样的,他们返回的这些数据的时间戳是不一样的。
注意:可能会存在某几条数据的时间戳是一样的,属于正常现象,说明程序处理的比较快
(4)修改代码,我们改成批处理模式执行:
//创建执行环境 val settings = EnvironmentSettings .newInstance() //指定执行模式,支持inBatchMode和inStreamingMode .inBatchMode() .build() val tEnv = TableEnvironment.create(settings)
- 执行程序,可以看到控制台输出内容如下。从这个结果中可以看出,CURRENT_TIMESTAM 返回的时间戳都是一样的。而 CURRENT_ROW_TIMESTAM() 返回的时间戳是不一样的。

全部评论(0)