Flink - WaterMark水印使用详解2(延迟数据的处理方式)
作者:hangge | 2025-03-10 08:39
Flink 针对延迟太久的数据有 3 种处理方案:丢弃、允许数据延迟一定时间、收集迟到的数据。下面通过样例分别进行演示。
(3)然后运行 Flink 程序。待程序启动后,我们在该终端中输入如下内容:
(4)可以看到控制台输出内容如下,特别注意最后一条输入数据所在的窗口已经执行过了,因此不会触发 window,Flink 默认对这些迟到的数据的处理方案就是丢弃。
(2)再次测试,我们输入同样的数据:
(3)可以看到控制台输出内容如下,特别注意最后一条输入数据所在的窗口:
(2)再次测试,我们输入同样的数据:
(3)可以看到控制台输出内容如下,可以看到迟到被丢弃的数据也被我们收集起来了:
一、丢弃
1,基本介绍
link 默认对迟到的数据的处理方案就是丢弃。
2,样例演示
(1)这里我们使用上文编写的 WaterMark 样例代码:
(2)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
(3)然后运行 Flink 程序。待程序启动后,我们在该终端中输入如下内容:
注意:一条一条输入,不要全部复制粘贴同时输入。
key1,1698225607000 key1,1698225600000 key1,1698225612000 key1,1698225603000
(4)可以看到控制台输出内容如下,特别注意最后一条输入数据所在的窗口已经执行过了,因此不会触发 window,Flink 默认对这些迟到的数据的处理方案就是丢弃。

二、指定允许数据延迟的时间
1,基本介绍
(1)在某些情况下,我们希望对迟到的数据再提供一个宽容的时间。
(2)Flink 提供了 allowedLateness 方法可以实现对迟到的数据设置一个延迟时间,在指定延迟时间内到达的数据还是可以触发 window 执行的。
2,使用样例
(1)我们修改代码增加如下一行代码即可,下面是 Scala 代码:
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import java.text.SimpleDateFormat
import java.time.Duration
object HelloWaterMarkScala {
def main(args: Array[String]): Unit = {
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置全局并行度为1
env.setParallelism(1)
// 定义 WatermarkStrategy,允许 3 秒的乱序
val watermarkStrategy = WatermarkStrategy
.forBoundedOutOfOrderness[String](Duration.ofSeconds(3)) // 允许 3 秒乱序
.withTimestampAssigner(new SerializableTimestampAssigner[String] {
override def extractTimestamp(element: String, recordTimestamp: Long): Long = {
val arr = element.split(",")
val timestamp = arr(1).toLong
println(s"Key: ${arr(0)}, EventTime: $timestamp (${sdf.format(timestamp)})")
timestamp // 提取事件时间
}
})
.withIdleness(Duration.ofSeconds(20)) // 设置空闲 source 为 20 秒
// 应用 WatermarkStrategy
env.socketTextStream("192.168.121.128", 9999)
.assignTimestampsAndWatermarks(watermarkStrategy)
.map(line => {
val parts = line.split(",")
(parts(0), parts(1))
})
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(5)) // 允许 5 秒的延迟数据
.reduce(new ReduceFunction[(String, String)] {
override def reduce(t: (String, String), t1: (String, String)): (String, String) = {
(t._1, t._2 + " - " + t1._2)
}
})
.print("reduce结果")
env.execute("WaterMark Test Demo")
}
}
- 下面是 Java 代码:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.text.SimpleDateFormat;
import java.time.Duration;
public class HelloWaterMark {
public static void main(String[] args) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置全局并行度为1
env.setParallelism(1);
// 定义 WatermarkStrategy,允许 5 秒的乱序
WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 允许 3 秒乱序
.withTimestampAssigner(new SerializableTimestampAssigner<String>() {
@Override
public long extractTimestamp(String element, long recordTimestamp) {
String[] arr = element.split(",");
long timestamp = Long.parseLong(arr[1]);
System.out.println("Key:" + arr[0]
+ ", EventTime: " + timestamp + "(" + sdf.format(timestamp) + ")");
return timestamp; // 提取事件时间
}
})
.withIdleness(Duration.ofSeconds(20));// 设置空闲source为20秒
// 应用 WatermarkStrategy
env.socketTextStream("192.168.121.128", 9999)
.assignTimestampsAndWatermarks(watermarkStrategy)
.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String s) throws Exception {
return new Tuple2<>(s.split(",")[0], s.split(",")[1]);
}
})
.keyBy(value -> value.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(5)) // 允许 5 秒的延迟数据
.reduce(new ReduceFunction<Tuple2<String, String>>() {
@Override
public Tuple2<String, String> reduce(Tuple2<String, String> value1,
Tuple2<String, String> value2) throws Exception {
return new Tuple2<>(value1.f0, value1.f1 + " - " + value2.f1);
}
})
.print("reduce结果");
env.execute("WaterMark Test Demo");
}
}
(2)再次测试,我们输入同样的数据:
key1,1698225607000 key1,1698225600000 key1,1698225612000 key1,1698225603000
(3)可以看到控制台输出内容如下,特别注意最后一条输入数据所在的窗口:
- 第一次触发是在 Watermark >= window_end_time 时。
- 第二次(或多次)触发的条件是 Watermark < window_end_time + allowedLateness 时间内,这个窗口有 Late 数据到达时。

三、收集迟到的数据
1,基本介绍
通过 sideOutputLateData 函数可以把迟到的数据统一收集,统一存储,方便后期排查问题。
2,样例演示
(1)下面样例我们将丢弃的数据保存起来,并打印到控制台中。下面是 Scala 代码:
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import java.text.SimpleDateFormat
import java.time.Duration
object HelloWaterMarkScala {
def main(args: Array[String]): Unit = {
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置全局并行度为1
env.setParallelism(1)
// 定义 WatermarkStrategy,允许 3 秒的乱序
val watermarkStrategy = WatermarkStrategy
.forBoundedOutOfOrderness[String](Duration.ofSeconds(3)) // 允许 3 秒乱序
.withTimestampAssigner(new SerializableTimestampAssigner[String] {
override def extractTimestamp(element: String, recordTimestamp: Long): Long = {
val arr = element.split(",")
val timestamp = arr(1).toLong
println(s"Key: ${arr(0)}, EventTime: $timestamp (${sdf.format(timestamp)})")
timestamp // 提取事件时间
}
})
.withIdleness(Duration.ofSeconds(20)) // 设置空闲 source 为 20 秒
// 保存被丢弃的数据
val outputTag=new OutputTag[(String, String)](id ="late-data"){}
// 应用 WatermarkStrategy
val resStream = env.socketTextStream("192.168.121.128", 9999)
.assignTimestampsAndWatermarks(watermarkStrategy)
.map(line => {
val parts = line.split(",")
(parts(0), parts(1))
})
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sideOutputLateData(outputTag) // 保存被丢弃的数据
.reduce(new ReduceFunction[(String, String)] {
override def reduce(t: (String, String), t1: (String, String)): (String, String) = {
(t._1, t._2 + " - " + t1._2)
}
})
// 把丢弃的数据取出来,暂时打印到控制台,实际工作中可以选择存储到其它存储介质中。如redis,kafka
val sideOutput = resStream.getSideOutput(outputTag)
sideOutput.print("丢弃的数据")
// 将流中的结果数据也打印到控制台
resStream.print("reduce结果")
env.execute("WaterMark Test Demo")
}
}
- 下面是 Java 代码:
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import java.text.SimpleDateFormat;
import java.time.Duration;
public class HelloWaterMark {
public static void main(String[] args) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置全局并行度为1
env.setParallelism(1);
// 定义 WatermarkStrategy,允许 3 秒的乱序
WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 允许 3 秒乱序
.withTimestampAssigner(new SerializableTimestampAssigner<String>() {
@Override
public long extractTimestamp(String element, long recordTimestamp) {
String[] arr = element.split(",");
long timestamp = Long.parseLong(arr[1]);
System.out.println("Key: " + arr[0] + ", EventTime: "
+ timestamp + " (" + sdf.format(timestamp) + ")");
return timestamp; // 提取事件时间
}
})
.withIdleness(Duration.ofSeconds(20)); // 设置空闲 source 为 20 秒
// 保存被丢弃的数据
OutputTag<Tuple2<String, String>> outputTag
= new OutputTag<Tuple2<String, String>>("late-data") {};
// 应用 WatermarkStrategy
DataStream<String> inputStream = env.socketTextStream("192.168.121.128", 9999);
SingleOutputStreamOperator<Tuple2<String, String>> resStream = inputStream
.assignTimestampsAndWatermarks(watermarkStrategy)
.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String s) throws Exception {
return new Tuple2<>(s.split(",")[0], s.split(",")[1]);
}
})
.keyBy(tuple -> tuple.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sideOutputLateData(outputTag) // 保存被丢弃的数据
.reduce(new ReduceFunction<Tuple2<String, String>>() {
@Override
public Tuple2<String, String> reduce(Tuple2<String, String> t1,
Tuple2<String, String> t2) {
return new Tuple2<>(t1.f0, t1.f1 + " - " + t2.f1);
}
});
// 把丢弃的数据取出来,暂时打印到控制台,实际工作中可以存储到其它存储介质中。如redis,kafka
DataStream<Tuple2<String, String>> sideOutput = resStream.getSideOutput(outputTag);
sideOutput.print("丢弃的数据");
// 将流中的结果数据也打印到控制台
resStream.print("reduce结果");
env.execute("WaterMark Test Demo");
}
}
(2)再次测试,我们输入同样的数据:
key1,1698225607000 key1,1698225600000 key1,1698225612000 key1,1698225603000
(3)可以看到控制台输出内容如下,可以看到迟到被丢弃的数据也被我们收集起来了:

全部评论(0)