返回 导航

大数据

hangge.com

Flink - State状态详解8(生存时间TTL)

作者:hangge | 2025-04-14 11:05
    默认情况下 State 数据会一直存在,如果存储了过多状态数据,可能会导致内存溢出(针对 HashMapStateBackend)。因此从 Flink 1.6 版本开始引入了 State TTL 特性。类似于 Redis 中的 TTL 机制,超时自动删除。下面我通过样例进行演示。

1,基本介绍

(1)TTL 特性可以支持对 KeyedState 中过期状态数据的自动清理,当状态中的某条数据到了过期时间,这条数据会被 Flink 自动删除,这样就有效解决了状态数据在无干预情况下无限增长导致内存溢出的问题。
  • 例如:我们在实时统计一段时间内的数据指标的时候,需要在状态中做去重,但是过了这段时间之后,之前的状态数据就没有用了,这样就可以用状态的 ttl 机制实现自动清理,当然我们也可以通过代码逻辑清空状态中的历史数据。

(2)而针对 OperatorState 类型而言,基本上不需要自动清理。
  • 例如:以我们之前开发的 OperatorState_MyBufferSinkDemo 代码为例(点击查看)。在 MyBufferSink 中,平时处理的数据是存储在任务内部的本地缓存中。只有在触发 checkpoint 的时候,才会把本地缓存中的数据写入到状态中,当下次触发 checkpoint 的时候我们会在代码中将之前的状态数据清空。所以状态中存储的数据不会一直增长,这样就没必要设置了状态的 TTL 了。

(3)数据过期判断依据:“上次修改的时间戳 + 我们设置的状态 TTL > 当前时间戳”。如果满足这个条件,那么这条状态数据就过期了。

2,实现原理

(1)本质上来讲,状态的 TTL 功能其实就是给每个 Keyed State 增加了一个“时间戳”,而 Flink 在状态创建、写入或读取的时候可以更新这个时间戳,并且判断状态是否过期。如果状态过期,还会根据可见性参数,来决定是否返回已过期但还未清理的状态。
  • 状态的清理并不是即时的,而是使用了一种 Lazy 的算法来实现,从而减少状态清理对性能的影响。

(2)举个例子,我要将一个 String 类型的数据存储到 ValueState 类型的状态中:
  • 如果没有设置状态的 TTL ,则直接将 String 类型的数据存储到 ValueState 中。
  • 如果设置了状态的 TTL,则 Flink 会将 <String, Long> 这种结构的数据存储到 ValueState 中,其中 Long 为时间戳,用于判断状态是否过期。

(3)在代码层面进行分析的话,是这样的:
  • 如果没有设置状态的 TTL,我们存储 String 类型的数据使用的是 ValueState
  • 当我们设置了状态的 TTL,那么就需要使用 ValueState 对应的 TtlValueState

3,使用样例

(1)下面时一个使用 Scala 语言编写的 TTL 使用样例:
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.StateTtlConfig.TtlTimeCharacteristic
import org.apache.flink.api.common.state.{StateTtlConfig, ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.time.Time
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object StateTTLDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //开启Checkpoint
    env.enableCheckpointing(1000*10)//为了观察方便,在这里设置为10秒执行一次

    val text = env.socketTextStream("192.168.121.128", 9999)
    import org.apache.flink.api.scala._
    val keyedStream = text.flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(_._1)

    keyedStream.map(new RichMapFunction[(String,Int),(String,Int)] {
      //声明一个ValueState类型的状态变量,存储单词出现的总次数
      private var countState: ValueState[Int] = _

      /**
       * 任务初始化的时候这个方法执行一次
       * @param parameters
       */
      override def open(parameters: Configuration): Unit = {
        //设置TTL机制相关配置
        val ttlConfig = StateTtlConfig
          //1:指定状态的生存时间
          .newBuilder(Time.seconds(10))
          //2:指定什么时候触发更新延长状态的TTL时间
          //OnCreateAndWrite:仅在创建和写入时触发TTL时间更新延长。
          //OnReadAndWrite:表示在读取的时候也会触发(包括创建和写入)
          .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
          //3:过期数据是否可访问
          //NeverReturnExpired:表示永远不会返回过期数据(可能会存在数据已过期但是还没有被清理)
          //ReturnExpiredIfNotCleanedUp:表示数据只要没有被删除,就算过期了也可以被访问
          .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
          //4:TTL的时间语义
          //判断数据是否过期时使用的时间语义,默认使用处理时间,目前只支持这一种
          .setTtlTimeCharacteristic(TtlTimeCharacteristic.ProcessingTime)
          //5:过期数据删除策略
          //cleanupFullSnapshot:全量删除
          //此时只有当任务从checkpoint或者savepoint恢复时才会删除所有过期数据
          //这种方式其实并不能真正解决使用HashMapStateBackend时内存压力问题,只有定时重启恢复才可解决
          //注意:这种方式不适合Rocksdb中的增量Checkpoint方式
          //.cleanupFullSnapshot()
          //cleanupIncrementally:针对内存的增量删除方式
          //增量删除策略只支持基于内存的HashMapStateBackend,不支持EmbeddedRocksDBStateBackend
          //它的实现思路是在所有数据上维护一个全局迭代器。当遇到某些事件(如状态访问)时会触发增量删除
          //cleanupSize=100 和 runCleanupForEveryRecord=true 表示
          //每访问一个状态数据就会向前迭代遍历100条数据并删除其中过期的数据
          .cleanupIncrementally(100,true)
          //针对Rocksdb的增量删除方式
          //当Rocksdb在做Compact(合并)的时候删除过期数据
          //每Compact(合并)1000个Entry之后,会从Flink中查询当前时间戳,用于判断这些数据是否过期
          //.cleanupInRocksdbCompactFilter(1000)
          .build
        //注册状态
        val valueStateDesc = new ValueStateDescriptor[Int](
          "countState",//指定状态名称
          classOf[Int]//指定状态中存储的数据类型
        )
        //开启TTL机制
        //注意:开启TTL机制会增加状态的存储空间,因为在存储状态时还需要将状态的上次修改时间一起存储
        valueStateDesc.enableTimeToLive(ttlConfig)
        countState = getRuntimeContext.getState(valueStateDesc)
      }

      override def map(value: (String, Int)): (String,Int) = {
        //从状态中获取这个key之前出现的次数
        var lastNum = countState.value()
        val currNum = value._2
        //如果这个key的数据是第一次过来,则将之前出现的次数初始化为0
        if(lastNum == null){
          lastNum = 0
        }
        //汇总出现的次数
        val sum = lastNum+currNum
        //更新状态
        countState.update(sum)
        //返回单词及单词出现的总次数
        (value._1,sum)
      }

    }).print()

    env.execute("StateTTLDemo")
  }
}

(2)下面是使用 Java 语言实行同样功能:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class StateTTLDemoJava {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 开启 Checkpoint,每 10 秒执行一次
        env.enableCheckpointing(10000);

        // 获取 Socket 输入
        DataStream<String> text = env.socketTextStream("192.168.121.128", 9999);

        // 处理输入数据,分词并按单词分组
        KeyedStream<Tuple2<String, Integer>, String> keyedStream = text
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    public void flatMap(String line, Collector<Tuple2<String, Integer>> out)
                            throws Exception {
                        String[] words = line.split(" ");
                        for (String word : words) {
                            out.collect(new Tuple2<>(word, 1));
                        }
                    }
                })
                .keyBy(tuple -> tuple.f0);

        // 应用自定义 RichMapFunction
        keyedStream.map(new RichMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {

            // 声明一个 ValueState 类型的状态变量,用于存储单词出现的总次数
            private transient ValueState<Integer> countState;

            @Override
            public void open(Configuration parameters) throws Exception {
                // 配置 TTL
                StateTtlConfig ttlConfig = StateTtlConfig
                        .newBuilder(Time.seconds(10)) // 设置状态存活时间
                        // 状态更新方式
                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                        // 过期状态不可访问s
                        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                        // 使用处理时间
                        .setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime)
                        // 增量删除策略
                        .cleanupIncrementally(100, true)
                        .build();

                // 注册状态
                ValueStateDescriptor<Integer> valueStateDesc = new ValueStateDescriptor<>(
                        "countState", // 状态名称
                        Integer.class // 状态存储的数据类型
                );
                valueStateDesc.enableTimeToLive(ttlConfig); // 开启 TTL
                countState = getRuntimeContext().getState(valueStateDesc);
            }

            @Override
            public Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
                // 从状态中获取之前的值
                Integer lastNum = countState.value();
                if (lastNum == null) {
                    lastNum = 0;
                }
                Integer currNum = value.f1;
                Integer sum = lastNum + currNum;

                // 更新状态
                countState.update(sum);

                // 返回单词及其总次数
                return new Tuple2<>(value.f0, sum);
            }
        }).print();

        // 执行任务
        env.execute("StateTTLDemo");
    }
}

4,运行测试

(1)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(2)然后运行 Flink 程序。待程序启动后,我们在该终端中输入如下内容。
a b
b

(3)由于我们设置了状态的 TTL 时间是 10 秒,所以 10 秒之后,再模拟产生一条数据 a
a

(4)立刻再模拟产生一条数据 a
a
评论

全部评论(0)

回到顶部