Flink - State状态详解6(OperatorState样例1:ListState实现批量输出功能)
作者:hangge | 2025-04-11 08:39
六、OperatorState 样例 1:ListState 实现有状态的批量输出功能
1,需求说明
我们想要实现一个批量输出的功能,此时可以考虑在 Sink 组件内部定义一个缓存,但是还要保证数据一定会输出到外部系统。
2,实现逻辑
这个时候就需要借助于状态实现了,通过 CheckpointedFunction 接口的 snapshotState 方法定期将批量缓存的数据保存到状态中,如果程序出现了故障,重启后还可以从状态中将未输出的数据读取到缓存中,继续输出到外部系统。
3,样例代码
(1)首先我们自定义一个 sink 实现批量输出,里面使用 ListState 保存状态数据,下面是 Scala 语言代码,:
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor} import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction import org.apache.flink.streaming.api.functions.sink.SinkFunction import scala.collection.mutable.ListBuffer class MyBufferSink extends SinkFunction[(String,Int)] with CheckpointedFunction{ //声明一个ListState类型的状态变量 private var checkpointedState: ListState[(String, Int)] = _ //定义一个本地缓存 private val bufferElements = ListBuffer[(String, Int)]() /** * Sink的核心处理逻辑,将接收到的数据输出到外部系统 * 接收到一条数据,这个方法就会执行一次 * @param value */ override def invoke(value: (String, Int)): Unit = { //将接收到的数据保存到本地缓存中 bufferElements += value //当本地缓存大小到达一定阈值时,将本地缓存中的数据一次性输出到外部系统 if (bufferElements.size == 2) { println("======start======") for (element <- bufferElements) { println(element) } println("======end======") //清空本地缓存中的数据 bufferElements.clear() } } /** * 将本地缓存中的数据保存到状态中,在执行checkpoint时,会将状态中的数据持久化到外部存储中 * @param context */ override def snapshotState(context: FunctionSnapshotContext): Unit = { //将上次写入到状态中的数据清空 checkpointedState.clear() //将最新的本地缓存中的数据写入到状态中 for (element <- bufferElements) { checkpointedState.add(element) } } /** * 初始化或者恢复状态 * @param context */ override def initializeState(context: FunctionInitializationContext): Unit = { //注册状态 val descriptor = new ListStateDescriptor[(String, Int)]( "buffered-elements", classOf[(String,Int)] ) //此时借助于context获取OperatorStateStore,进而获取ListState checkpointedState = context.getOperatorStateStore.getListState(descriptor) // 如果是重启任务,需要从外部存储中读取状态数据并写入到本地缓存中 if(context.isRestored) { checkpointedState.get().forEach(e=>{ bufferElements += e }) } } }
- 然后我们使用这个定义的 Sink 进行批量输出:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object OperatorState_MyBufferSinkDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //设置并行度为2 env.setParallelism(2) val text = env.socketTextStream("192.168.121.128", 9999) import org.apache.flink.api.scala._ text.flatMap(_.split(" ")) .map((_, 1)) .addSink(new MyBufferSink()) env.execute("OperatorState_MyBufferSinkDemo") } }
(2)下面是使用 Java 语言实现同样自定义 Sink:
import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import java.util.ArrayList; import java.util.List; public class MyBufferSink implements SinkFunction<Tuple2<String, Integer>>, CheckpointedFunction { // 声明一个 ListState 类型的状态变量 private transient ListState<Tuple2<String, Integer>> checkpointedState; // 定义一个本地缓存 private final List<Tuple2<String, Integer>> bufferElements = new ArrayList<>(); /** * Sink 的核心处理逻辑,将接收到的数据输出到外部系统 * 接收到一条数据,这个方法就会执行一次 * * @param value */ @Override public void invoke(Tuple2<String, Integer> value, Context context) throws Exception { // 将接收到的数据保存到本地缓存中 bufferElements.add(value); // 当本地缓存大小到达一定阈值时,将本地缓存中的数据一次性输出到外部系统 if (bufferElements.size() == 2) { System.out.println("======start======"); for (Tuple2<String, Integer> element : bufferElements) { System.out.println(element); } System.out.println("======end======"); // 清空本地缓存中的数据 bufferElements.clear(); } } /** * 将本地缓存中的数据保存到状态中,在执行 checkpoint 时,会将状态中的数据持久化到外部存储中 * * @param context */ @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { // 将上次写入到状态中的数据清空 checkpointedState.clear(); // 将最新的本地缓存中的数据写入到状态中 for (Tuple2<String, Integer> element : bufferElements) { checkpointedState.add(element); } } /** * 初始化或者恢复状态 * * @param context */ @Override public void initializeState(FunctionInitializationContext context) throws Exception { // 注册状态 ListStateDescriptor<Tuple2<String, Integer>> descriptor = new ListStateDescriptor<>("buffered-elements", Types.TUPLE(Types.STRING, Types.INT)); // 此时借助于 context 获取 OperatorStateStore,进而获取 ListState checkpointedState = context.getOperatorStateStore().getListState(descriptor); // 如果是重启任务,需要从外部存储中读取状态数据并写入到本地缓存中 if (context.isRestored()) { for (Tuple2<String, Integer> element : checkpointedState.get()) { bufferElements.add(element); } } } }
- 然后我们使用这个定义的 Sink 进行批量输出:
import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class OperatorState_MyBufferSinkDemo { public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并行度为 2 env.setParallelism(2); // 从 Socket 中读取数据流 DataStream<String> text = env.socketTextStream("192.168.121.128", 9999); // 数据处理逻辑 text.flatMap((String line, org.apache.flink.util.Collector<Tuple2<String, Integer>> out) -> { for (String word : line.split(" ")) { out.collect(new Tuple2<>(word, 1)); } }) .returns(TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {})) .addSink(new MyBufferSink()); // 执行任务 env.execute("OperatorState_MyBufferSinkDemo"); } }
4,运行测试
(1)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
(2)然后运行 Flink 程序。待程序启动后,我们在该终端中输入如下内容。
a b
- 此时控制台没有输出数据。这是因为此时我们给程序设置的并行度为 2。所以 sink 组件会产生 2 个子任务,这两条数据现在被划分到了两个子任务中,每个子任务只收到了 1 条数据,所以不满足输出数据的条件。
c
- 此时发现控制台输出数据了,即第一个子任务的两条数据:

d
- 此时发现控制台又输出数据了,即第二个子任务的两条数据:

全部评论(0)