返回 导航

大数据

hangge.com

Flink - State状态详解6(OperatorState样例1:ListState实现批量输出功能)

作者:hangge | 2025-04-11 08:39

六、OperatorState 样例 1:ListState 实现有状态的批量输出功能

1,需求说明

   我们想要实现一个批量输出的功能,此时可以考虑在 Sink 组件内部定义一个缓存,但是还要保证数据一定会输出到外部系统。

2,实现逻辑

   这个时候就需要借助于状态实现了,通过 CheckpointedFunction 接口的 snapshotState 方法定期将批量缓存的数据保存到状态中,如果程序出现了故障,重启后还可以从状态中将未输出的数据读取到缓存中,继续输出到外部系统。

3,样例代码

(1)首先我们自定义一个 sink 实现批量输出,里面使用 ListState 保存状态数据,下面是 Scala 语言代码,:
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import scala.collection.mutable.ListBuffer

class MyBufferSink extends SinkFunction[(String,Int)] with CheckpointedFunction{
  //声明一个ListState类型的状态变量
  private var checkpointedState: ListState[(String, Int)] = _

  //定义一个本地缓存
  private val bufferElements = ListBuffer[(String, Int)]()

  /**
   * Sink的核心处理逻辑,将接收到的数据输出到外部系统
   * 接收到一条数据,这个方法就会执行一次
   * @param value
   */
  override def invoke(value: (String, Int)): Unit = {
    //将接收到的数据保存到本地缓存中
    bufferElements += value
    //当本地缓存大小到达一定阈值时,将本地缓存中的数据一次性输出到外部系统
    if (bufferElements.size == 2) {
      println("======start======")
      for (element <- bufferElements) {
        println(element)
      }
      println("======end======")
      //清空本地缓存中的数据
      bufferElements.clear()
    }
  }

  /**
   * 将本地缓存中的数据保存到状态中,在执行checkpoint时,会将状态中的数据持久化到外部存储中
   * @param context
   */
  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    //将上次写入到状态中的数据清空
    checkpointedState.clear()
    //将最新的本地缓存中的数据写入到状态中
    for (element <- bufferElements) {
      checkpointedState.add(element)
    }
  }

  /**
   * 初始化或者恢复状态
   * @param context
   */
  override def initializeState(context: FunctionInitializationContext): Unit = {
    //注册状态
    val descriptor = new ListStateDescriptor[(String, Int)](
      "buffered-elements",
      classOf[(String,Int)]
    )
    //此时借助于context获取OperatorStateStore,进而获取ListState
    checkpointedState = context.getOperatorStateStore.getListState(descriptor)

    // 如果是重启任务,需要从外部存储中读取状态数据并写入到本地缓存中
    if(context.isRestored) {
      checkpointedState.get().forEach(e=>{
        bufferElements += e
      })
    }
  }
}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object OperatorState_MyBufferSinkDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //设置并行度为2
    env.setParallelism(2)

    val text = env.socketTextStream("192.168.121.128", 9999)

    import org.apache.flink.api.scala._
    text.flatMap(_.split(" "))
      .map((_, 1))
      .addSink(new MyBufferSink())

    env.execute("OperatorState_MyBufferSinkDemo")
  }
}

(2)下面是使用 Java 语言实现同样自定义 Sink
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.util.ArrayList;
import java.util.List;

public class MyBufferSink implements SinkFunction<Tuple2<String, Integer>>, CheckpointedFunction {

    // 声明一个 ListState 类型的状态变量
    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    // 定义一个本地缓存
    private final List<Tuple2<String, Integer>> bufferElements = new ArrayList<>();

    /**
     * Sink 的核心处理逻辑,将接收到的数据输出到外部系统
     * 接收到一条数据,这个方法就会执行一次
     *
     * @param value
     */
    @Override
    public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
        // 将接收到的数据保存到本地缓存中
        bufferElements.add(value);

        // 当本地缓存大小到达一定阈值时,将本地缓存中的数据一次性输出到外部系统
        if (bufferElements.size() == 2) {
            System.out.println("======start======");
            for (Tuple2<String, Integer> element : bufferElements) {
                System.out.println(element);
            }
            System.out.println("======end======");
            // 清空本地缓存中的数据
            bufferElements.clear();
        }
    }

    /**
     * 将本地缓存中的数据保存到状态中,在执行 checkpoint 时,会将状态中的数据持久化到外部存储中
     *
     * @param context
     */
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        // 将上次写入到状态中的数据清空
        checkpointedState.clear();
        // 将最新的本地缓存中的数据写入到状态中
        for (Tuple2<String, Integer> element : bufferElements) {
            checkpointedState.add(element);
        }
    }

    /**
     * 初始化或者恢复状态
     *
     * @param context
     */
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        // 注册状态
        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
                new ListStateDescriptor<>("buffered-elements", Types.TUPLE(Types.STRING, Types.INT));

        // 此时借助于 context 获取 OperatorStateStore,进而获取 ListState
        checkpointedState = context.getOperatorStateStore().getListState(descriptor);

        // 如果是重启任务,需要从外部存储中读取状态数据并写入到本地缓存中
        if (context.isRestored()) {
            for (Tuple2<String, Integer> element : checkpointedState.get()) {
                bufferElements.add(element);
            }
        }
    }
}
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class OperatorState_MyBufferSinkDemo {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置并行度为 2
        env.setParallelism(2);

        // 从 Socket 中读取数据流
        DataStream<String> text = env.socketTextStream("192.168.121.128", 9999);

        // 数据处理逻辑
        text.flatMap((String line, org.apache.flink.util.Collector<Tuple2<String, Integer>> out) -> {
                    for (String word : line.split(" ")) {
                        out.collect(new Tuple2<>(word, 1));
                    }
                })
                .returns(TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}))
                .addSink(new MyBufferSink());

        // 执行任务
        env.execute("OperatorState_MyBufferSinkDemo");
    }
}

4,运行测试

(1)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(2)然后运行 Flink 程序。待程序启动后,我们在该终端中输入如下内容。
a
b

(3)接着我们再产生一条数据:
c

(4)接着我们再产生一条数据:
d
评论

全部评论(0)

回到顶部