返回 导航

大数据

hangge.com

Flink - State状态详解7(OperatorState样例2:BroadcastState实现双流连接)

作者:hangge | 2025-04-12 10:43

七、OperatorState 样例 1:使用 BroadcastState 实现双流连接

1,需求说明

(1)针对 BroadcastState 的使用,一个典型的应用案例就是两个流连接的场景。
  • 假设其中一个数据流是“事件数据流”,它属于普通的数据流,里面是一些用户行为数据。
  • 另外一个数据流是“配置数据流”,它不是普通的数据流,它是广播数据流,里面是一些映射关系数据。
  • 需求是使用配置数据流中的映射关系数据去完善事件数据流中的用户行为数据。

(2)这个需求来源于某直播平台,这个直播平台会在多个国家运营,如果每一个国家都使用一套运营策略,会比较麻烦,运营成本比较高,意义也不是特别大。为了方便运营管理,所以平台内部提出了大区这个概念,可以将一些国家划分到同一个大区里面,同一个大区使用相同的运营策略。
  • 那么对应的就有国家和大区之间的映射关系,这个映射关系不是一成不变的,他会随着平台的发展而发生变化。
  • 在用户行为数据中针对用户的基础数据里面只有用户所属的国家信息,没有包含大区信息,因为国家和大区的关系是可变的,但是在做报表统计的时候,是需要以大区维度进行统计的。
  • 所以针对实时报表这种场景,就需要对用户行为数据中的国家信息进行实时关联转换了。

2,实现逻辑

(1)我们可以通过下图加深理解,这个图里面有两个实时数据流。
  • 上面的事件数据流里面是用户的行为数据。
  • 下面的配置数据流里面是国家和大区之间的最新映射关系。

(2)在程序中需要将配置数据流广播出去,转换为 BroadcastState,然后将两份数据流连接到一起,这样在处理事件数据流中的用户行为数据的时候,可以获取到 BroadcastState,基于这份状态数据对用户行为数据中的国家信息进行转换。

3,样例代码(Scala 语言)

(1)针对事件数据流在实际工作中基本上是来源于 kafka 的,在这里为了演示方便,我们来开发一个自定义的 Source 模拟产生数据。
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import scala.util.Random

/**
 * 事件数据流-自定义Source
 */
class MyStreamSource extends RichSourceFunction[String]{
  var isRunning = true

  /**
   * 初始化方法,只执行一次
   *
   * @param parameters
   */
  override def open(parameters: Configuration): Unit = {
  }

  /**
   * Source的核心方法,负责源源不断的产生数据
   * @param ctx
   */
  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
    val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    while (isRunning){
      //{"dt":"2026-01-01 10:11:11","countryCode":"US",
      // "data":[{"type":"s1","score":0.3,"level":"A"},{"type":"s2","score":0.2,"level":"B"}]}
      val time = sdf.format(new Date)
      val line_prefix = "{\"dt\":\""+time+"\",\"countryCode\":\""
      val line_suffix = "\",\"data\":[{\"type\":\"s1\",\"score\":0.3,\"level\":\"A\"}," +
        "{\"type\":\"s2\",\"score\":0.2,\"level\":\"B\"}]}"
      val countryCodeArr = Array("US","PK","KW")
      val num = Random.nextInt(3)
      ctx.collect(line_prefix+countryCodeArr(num)+line_suffix)
      Thread.sleep(1000)//每隔1秒产生一条数据,控制一下数据产生速度
    }
  }

  /**
   * 任务停止的时候执行一次
   * 这里主要负责控制run方法中的循环
   */
  override def cancel(): Unit = {
    isRunning = false
  }

  /**
   *任务停止的时候执行一次
   * 这里主要负责关闭在open方法中创建的连接
   */
  override def close(): Unit = {

  }
}

(2)针对配置数据流在实际工作中基本上是来源于 Redis 或者 MySQL,在这里为了演示方便,我们来开发一个自定义的 Source 模拟从 Redis 中获取数据。
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import scala.collection.mutable

/**
 * 配置数据流-自定义Source
 */
class MyRedisSource extends RichSourceFunction[mutable.Map[String,String]]{
  var isRunning = true

  /**
   * 初始化方法,只执行一次
   * @param parameters
   */
  override def open(parameters: Configuration): Unit = {
    //TODO 创建Redis数据库连接
  }

  /**
   * Source的核心方法,负责源源不断的产生数据
   * @param ctx
   */
  override def run(ctx: SourceFunction.SourceContext[mutable.Map[String, String]]): Unit = {
    while (isRunning){
      //TODO 需要从Redis中获取这些映射关系
      val resMap = mutable.Map("US"->"AREA_US","PK"->"AREA_AR","KW"->"AREA_AR")
      ctx.collect(resMap)
      Thread.sleep(5000)//每隔5秒更新一次配置数据
    }
  }

  /**
   * 任务停止的时候执行一次
   * 这里主要负责控制run方法中的循环
   */
  override def cancel(): Unit = {
    isRunning = false
  }

  /**
   *任务停止的时候执行一次
   * 这里主要负责关闭在open方法中创建的连接
   */
  override def close(): Unit = {
    //TODO 关闭Redis数据库连接
  }
}

(3)程序的核心代码如下:
提示:针对这个需求,因为没有对数据流进行 keyBy 分组,如果不对配置数据流进行广播,那么在处理事件数据流中的数据的时候,每一个子任务无法获取到配置数据流中的所有映射关系,所以只能通过全量广播,这样才能让事件数据流的每一个子任务都可以获取到所有的配置数据流中的数据,最终实现数据关联转换。
import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
import scala.collection.JavaConverters.mapAsJavaMap
import scala.collection.mutable

/**
 * BroadcastState在两个流连接中的应用(双流Join)
 * 这个场景类似于:一个事实表(事件数据流) left join 一个维度表(配置数据流)
 */
object OperatorState_BroadcastStateDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._
    //构建第1个数据流:事件数据流
    val eventStream = env.addSource(new MyStreamSource)

    //构建第2个数据流:配置数据流
    val confStream = env.addSource(new MyRedisSource)

    //将配置数据流广播出去,变成广播数据流,并且注册一个MapState
    val countryAreaMapStateDescriptor = new MapStateDescriptor[String, String](
      "countryArea",
      classOf[String],
      classOf[String]
    )
    val broadcastStream = confStream.broadcast(countryAreaMapStateDescriptor)

    //将两个流进行连接
    val broadcastConnectStream = eventStream.connect(broadcastStream)

    //处理连接后的流
    broadcastConnectStream.process(
      new BroadcastProcessFunction[String, mutable.Map[String, String], String] {
        //处理事件数据流中的数据
        override def processElement(value: String,
                ctx: BroadcastProcessFunction[String, mutable.Map[String, String], String]
                  #ReadOnlyContext,
                out: Collector[String]): Unit = {
          val jsonObj = JSON.parseObject(value)
          val countryCode = jsonObj.getString("countryCode")
          //取出广播状态中的数据
          val broadcastState = ctx.getBroadcastState(countryAreaMapStateDescriptor)
          val area = broadcastState.get(countryCode)
          //任务刚开始执行的时候broadcastState中的数据为空,所以获取不到数据
          if (area != null) {
            jsonObj.put("countryCode", area)
            out.collect(jsonObj.toJSONString)
          }
        }

        //处理广播后的配置数据流中的数据
        override def processBroadcastElement(value: mutable.Map[String, String],
             ctx: BroadcastProcessFunction[String, mutable.Map[String, String], String]#Context,
             out: Collector[String]): Unit = {
          //获取BroadcastState
          val broadcastState = ctx.getBroadcastState(countryAreaMapStateDescriptor)
          //清空BroadcastState中的数据
          broadcastState.clear()
          //重新写入最新的映射关系数据
          broadcastState.putAll(mapAsJavaMap(value))
        }
      }).print()

    env.execute("OperatorState_BroadcastStateDemo")
  }
}

(4)运行程序可以看到控制台输出内容如下:

4,样例代码(Java 语言)

(1)针对事件数据流在实际工作中基本上是来源于 kafka 的,在这里为了演示方便,我们来开发一个自定义的 Source 模拟产生数据。
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

/**
 * 事件数据流-自定义Source
 */
public class MyStreamSource extends RichSourceFunction<String> {
    private volatile boolean isRunning = true;

    /**
     * 初始化方法,只执行一次
     *
     * @param parameters
     */
    @Override
    public void open(Configuration parameters) {
        // 初始化代码
    }

    /**
     * Source的核心方法,负责源源不断的产生数据
     *
     * @param ctx
     */
    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Random random = new Random();
        String[] countryCodeArr = {"US", "PK", "KW"};

        while (isRunning) {
            String time = sdf.format(new Date());
            String linePrefix = "{\"dt\":\"" + time + "\",\"countryCode\":\"";
            String lineSuffix = "\",\"data\":[{\"type\":\"s1\",\"score\":0.3,\"level\":\"A\"}," +
                    "{\"type\":\"s2\",\"score\":0.2,\"level\":\"B\"}]}";
            int num = random.nextInt(3);
            ctx.collect(linePrefix + countryCodeArr[num] + lineSuffix);
            Thread.sleep(1000); // 每隔1秒产生一条数据,控制一下数据产生速度
        }
    }

    /**
     * 任务停止的时候执行一次
     * 这里主要负责控制run方法中的循环
     */
    @Override
    public void cancel() {
        isRunning = false;
    }

    /**
     * 任务停止的时候执行一次
     * 这里主要负责关闭在open方法中创建的连接
     */
    @Override
    public void close() {
        // 关闭资源
    }
}

(2)针对配置数据流在实际工作中基本上是来源于 Redis 或者 MySQL,在这里为了演示方便,我们来开发一个自定义的 Source 模拟从 Redis 中获取数据。
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.util.HashMap;
import java.util.Map;

/**
 * 配置数据流-自定义Source
 */
public class MyRedisSource extends RichSourceFunction<Map<String, String>> {
    private volatile boolean isRunning = true;

    /**
     * 初始化方法,只执行一次
     *
     * @param parameters
     */
    @Override
    public void open(Configuration parameters) {
        // TODO 创建Redis数据库连接
    }

    /**
     * Source的核心方法,负责源源不断的产生数据
     *
     * @param ctx
     */
    @Override
    public void run(SourceContext<Map<String, String>> ctx) throws Exception {
        while (isRunning) {
            // TODO 从Redis中获取这些映射关系
            Map<String, String> resMap = new HashMap<>();
            resMap.put("US", "AREA_US");
            resMap.put("PK", "AREA_AR");
            resMap.put("KW", "AREA_AR");

            ctx.collect(resMap);
            Thread.sleep(5000); // 每隔5秒更新一次配置数据
        }
    }

    /**
     * 任务停止的时候执行一次
     * 这里主要负责控制run方法中的循环
     */
    @Override
    public void cancel() {
        isRunning = false;
    }

    /**
     * 任务停止的时候执行一次
     * 这里主要负责关闭在open方法中创建的连接
     */
    @Override
    public void close() {
        // TODO 关闭Redis数据库连接
    }
}

(3)程序的核心代码如下:
提示:针对这个需求,因为没有对数据流进行 keyBy 分组,如果不对配置数据流进行广播,那么在处理事件数据流中的数据的时候,每一个子任务无法获取到配置数据流中的所有映射关系,所以只能通过全量广播,这样才能让事件数据流的每一个子任务都可以获取到所有的配置数据流中的数据,最终实现数据关联转换。
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import java.util.Map;

/**
 * BroadcastState在两个流连接中的应用(双流Join)
 * 这个场景类似于:一个事实表(事件数据流) left join 一个维度表(配置数据流)
 */
public class OperatorStateBroadcastStateDemo {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 构建第1个数据流:事件数据流
        DataStream<String> eventStream = env.addSource(new MyStreamSource());

        // 构建第2个数据流:配置数据流
        DataStream<Map<String, String>> confStream = env.addSource(new MyRedisSource());

        // 定义并注册一个MapState
        MapStateDescriptor<String, String> countryAreaMapStateDescriptor =
                new MapStateDescriptor<>(
                        "countryArea",
                        String.class,
                        String.class
                );

        // 将配置数据流广播出去
        BroadcastStream<Map<String, String>> broadcastStream
                = confStream.broadcast(countryAreaMapStateDescriptor);

        // 将两个流进行连接
        BroadcastConnectedStream<String, Map<String, String>> broadcastConnectStream
                = eventStream.connect(broadcastStream);

        // 处理连接后的流
        broadcastConnectStream.process(
                new BroadcastProcessFunction<String, Map<String, String>, String>() {
            // 处理事件数据流中的数据
            @Override
            public void processElement(String value, ReadOnlyContext ctx, Collector<String> out)
                    throws Exception {
                JSONObject jsonObj = JSON.parseObject(value);
                String countryCode = jsonObj.getString("countryCode");
                // 从广播状态中取出数据
                ReadOnlyBroadcastState<String, String> broadcastState
                        = ctx.getBroadcastState(countryAreaMapStateDescriptor);
                String area = broadcastState.get(countryCode);
                // 如果找到对应的映射关系,则更新数据
                if (area != null) {
                    jsonObj.put("countryCode", area);
                    out.collect(jsonObj.toJSONString());
                }
            }

            // 处理广播后的配置数据流中的数据
            @Override
            public void processBroadcastElement(Map<String, String> value, Context ctx,
                                                Collector<String> out) throws Exception {
                // 获取BroadcastState
                BroadcastState<String, String> broadcastState
                        = ctx.getBroadcastState(countryAreaMapStateDescriptor);
                // 清空BroadcastState中的数据
                broadcastState.clear();
                // 写入新的映射关系数据
                broadcastState.putAll(value);
            }
        }).print();

        env.execute("OperatorState_BroadcastStateDemo");
    }
}

(4)运行程序可以看到控制台输出内容如下:
评论

全部评论(0)

回到顶部