Flink - State状态详解6(OperatorState样例1:ListState实现批量输出功能)
作者:hangge | 2025-04-11 08:39
六、OperatorState 样例 1:ListState 实现有状态的批量输出功能
1,需求说明
我们想要实现一个批量输出的功能,此时可以考虑在 Sink 组件内部定义一个缓存,但是还要保证数据一定会输出到外部系统。
2,实现逻辑
这个时候就需要借助于状态实现了,通过 CheckpointedFunction 接口的 snapshotState 方法定期将批量缓存的数据保存到状态中,如果程序出现了故障,重启后还可以从状态中将未输出的数据读取到缓存中,继续输出到外部系统。
3,样例代码
(1)首先我们自定义一个 sink 实现批量输出,里面使用 ListState 保存状态数据,下面是 Scala 语言代码,:
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import scala.collection.mutable.ListBuffer
class MyBufferSink extends SinkFunction[(String,Int)] with CheckpointedFunction{
//声明一个ListState类型的状态变量
private var checkpointedState: ListState[(String, Int)] = _
//定义一个本地缓存
private val bufferElements = ListBuffer[(String, Int)]()
/**
* Sink的核心处理逻辑,将接收到的数据输出到外部系统
* 接收到一条数据,这个方法就会执行一次
* @param value
*/
override def invoke(value: (String, Int)): Unit = {
//将接收到的数据保存到本地缓存中
bufferElements += value
//当本地缓存大小到达一定阈值时,将本地缓存中的数据一次性输出到外部系统
if (bufferElements.size == 2) {
println("======start======")
for (element <- bufferElements) {
println(element)
}
println("======end======")
//清空本地缓存中的数据
bufferElements.clear()
}
}
/**
* 将本地缓存中的数据保存到状态中,在执行checkpoint时,会将状态中的数据持久化到外部存储中
* @param context
*/
override def snapshotState(context: FunctionSnapshotContext): Unit = {
//将上次写入到状态中的数据清空
checkpointedState.clear()
//将最新的本地缓存中的数据写入到状态中
for (element <- bufferElements) {
checkpointedState.add(element)
}
}
/**
* 初始化或者恢复状态
* @param context
*/
override def initializeState(context: FunctionInitializationContext): Unit = {
//注册状态
val descriptor = new ListStateDescriptor[(String, Int)](
"buffered-elements",
classOf[(String,Int)]
)
//此时借助于context获取OperatorStateStore,进而获取ListState
checkpointedState = context.getOperatorStateStore.getListState(descriptor)
// 如果是重启任务,需要从外部存储中读取状态数据并写入到本地缓存中
if(context.isRestored) {
checkpointedState.get().forEach(e=>{
bufferElements += e
})
}
}
}
- 然后我们使用这个定义的 Sink 进行批量输出:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object OperatorState_MyBufferSinkDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度为2
env.setParallelism(2)
val text = env.socketTextStream("192.168.121.128", 9999)
import org.apache.flink.api.scala._
text.flatMap(_.split(" "))
.map((_, 1))
.addSink(new MyBufferSink())
env.execute("OperatorState_MyBufferSinkDemo")
}
}
(2)下面是使用 Java 语言实现同样自定义 Sink:
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import java.util.ArrayList;
import java.util.List;
public class MyBufferSink implements SinkFunction<Tuple2<String, Integer>>, CheckpointedFunction {
// 声明一个 ListState 类型的状态变量
private transient ListState<Tuple2<String, Integer>> checkpointedState;
// 定义一个本地缓存
private final List<Tuple2<String, Integer>> bufferElements = new ArrayList<>();
/**
* Sink 的核心处理逻辑,将接收到的数据输出到外部系统
* 接收到一条数据,这个方法就会执行一次
*
* @param value
*/
@Override
public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
// 将接收到的数据保存到本地缓存中
bufferElements.add(value);
// 当本地缓存大小到达一定阈值时,将本地缓存中的数据一次性输出到外部系统
if (bufferElements.size() == 2) {
System.out.println("======start======");
for (Tuple2<String, Integer> element : bufferElements) {
System.out.println(element);
}
System.out.println("======end======");
// 清空本地缓存中的数据
bufferElements.clear();
}
}
/**
* 将本地缓存中的数据保存到状态中,在执行 checkpoint 时,会将状态中的数据持久化到外部存储中
*
* @param context
*/
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// 将上次写入到状态中的数据清空
checkpointedState.clear();
// 将最新的本地缓存中的数据写入到状态中
for (Tuple2<String, Integer> element : bufferElements) {
checkpointedState.add(element);
}
}
/**
* 初始化或者恢复状态
*
* @param context
*/
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// 注册状态
ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<>("buffered-elements", Types.TUPLE(Types.STRING, Types.INT));
// 此时借助于 context 获取 OperatorStateStore,进而获取 ListState
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
// 如果是重启任务,需要从外部存储中读取状态数据并写入到本地缓存中
if (context.isRestored()) {
for (Tuple2<String, Integer> element : checkpointedState.get()) {
bufferElements.add(element);
}
}
}
}
- 然后我们使用这个定义的 Sink 进行批量输出:
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class OperatorState_MyBufferSinkDemo {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度为 2
env.setParallelism(2);
// 从 Socket 中读取数据流
DataStream<String> text = env.socketTextStream("192.168.121.128", 9999);
// 数据处理逻辑
text.flatMap((String line, org.apache.flink.util.Collector<Tuple2<String, Integer>> out) -> {
for (String word : line.split(" ")) {
out.collect(new Tuple2<>(word, 1));
}
})
.returns(TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}))
.addSink(new MyBufferSink());
// 执行任务
env.execute("OperatorState_MyBufferSinkDemo");
}
}
4,运行测试
(1)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
(2)然后运行 Flink 程序。待程序启动后,我们在该终端中输入如下内容。
a b
- 此时控制台没有输出数据。这是因为此时我们给程序设置的并行度为 2。所以 sink 组件会产生 2 个子任务,这两条数据现在被划分到了两个子任务中,每个子任务只收到了 1 条数据,所以不满足输出数据的条件。
c
- 此时发现控制台输出数据了,即第一个子任务的两条数据:

d
- 此时发现控制台又输出数据了,即第二个子任务的两条数据:

全部评论(0)