Flink - State状态详解3(KeyedState样例2:MapState实现直播间数据统计)
作者:hangge | 2025-04-07 08:41
二、KeyedState 样例 1:MapState 实现直播间数据统计
1,需求说明
大致需求是这样的,需要统计平台中每个主播在直播间内收到的礼物信息、点赞、关注等指标,以直播间为单位进行统计。
2,实现逻辑
(1)由于用户每次开播都会生成一个新的直播间 vid,并且我们需要基于这个直播间 vid 统计它里面的数据指标,这些数据指标包含多个维度,例如:礼物对应的数量、点赞对应的数据量、关注对应的数量。这种数据就适合使用 MapState 进行存储了,每一个直播间的数据指标存储到一个 MapState 中。
送礼数据:{"type":"gift","uid":"1001","vid":"29901","value":"100"}
关注数据:{"type":"follow","uid":"1001","vid":"29901"}
点赞数据:{"type":"like","uid":"1001","vid":"29901"}
(2)由于数据格式是 JSON 格式的,所以我们还需要引入 fastjson 依赖:
<!-- JSON 依赖 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
3,样例代码
(1)下面是 Scala 语言代码:
import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
object KeyedState_VideoDataDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("192.168.121.128", 9999)
import org.apache.flink.api.scala._
text.map(line=>{
val videoJsonData = JSON.parseObject(line)
val vid = videoJsonData.getString("vid")
val videoType = videoJsonData.getString("type")
//也可以使用if语句实现
videoType match {
case "gift" => {
val value = videoJsonData.getIntValue("value")
(vid,videoType,value)
}
case _ => (vid,videoType,1)
}
}).keyBy(_._1)//注意:后面也可以使用flatmap算子,在这里换一种形式,使用低级API process
.process(new KeyedProcessFunction[String,(String,String,Int),(String,String,Int)] {
//声明一个MapState类型的状态变量,存储用户的直播间数据指标
//MapState中key的值为gift\follow\like,value的值为累加后的结果
private var videoDataState: MapState[String,Int] = _
override def open(parameters: Configuration): Unit = {
//注册状态
val mapStateDesc = new MapStateDescriptor[String,Int](
"videoDataState",//指定状态名称
classOf[String],//指定key的类型
classOf[Int]//指定value的类型
)
videoDataState = getRuntimeContext.getMapState(mapStateDesc)
}
override def processElement(value: (String, String, Int),
ctx: KeyedProcessFunction[String, (String, String, Int),
(String, String, Int)]#Context,
out: Collector[(String, String, Int)]): Unit = {
val videoType = value._2
var num = value._3
//判断状态中是否有这个数据
if(videoDataState.contains(videoType)){
num += videoDataState.get(videoType)
}
//更新状态
videoDataState.put(videoType,num)
out.collect((value._1,videoType,num))
}
}).print()
env.execute("KeyedState_VideoDataDemo")
}
}
(2)下面是使用 Java 语言实现同样功能:
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class KeyedStateVideoDataDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取 socket 数据流
DataStream<String> text = env.socketTextStream("192.168.121.128", 9999);
text.map(new MapFunction<String, Tuple3<String, String, Integer>>() {
@Override
public Tuple3<String, String, Integer> map(String s) throws Exception {
JSONObject videoJsonData = JSON.parseObject(s);
String vid = videoJsonData.getString("vid");
String videoType = videoJsonData.getString("type");
// 根据 videoType 判断逻辑
if ("gift".equals(videoType)) {
int value = videoJsonData.getIntValue("value");
return new Tuple3<>(vid, videoType, value);
} else {
return new Tuple3<>(vid, videoType, 1);
}
}
})
.keyBy(tuple -> tuple.f0) // 按 vid 进行 keyBy
.process(new KeyedProcessFunction<String, Tuple3<String, String, Integer>,
Tuple3<String, String, Integer>>() {
private transient MapState<String, Integer> videoDataState;
@Override
public void open(Configuration parameters) throws Exception {
// 注册状态
MapStateDescriptor<String, Integer> mapStateDesc =
new MapStateDescriptor<>(
"videoDataState", // 状态名称
String.class, // key 的类型
Integer.class // value 的类型
);
videoDataState = getRuntimeContext().getMapState(mapStateDesc);
}
@Override
public void processElement(Tuple3<String, String, Integer> value,
Context ctx,
Collector<Tuple3<String, String, Integer>> out)
throws Exception {
String videoType = value.f1;
int num = value.f2;
// 判断状态中是否存在此数据
if (videoDataState.contains(videoType)) {
num += videoDataState.get(videoType);
}
// 更新状态
videoDataState.put(videoType, num);
out.collect(new Tuple3<>(value.f0, videoType, num));
}
}).print();
env.execute("KeyedState_VideoDataDemo");
}
}
4,运行测试
(1)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
(2)然后运行 Flink 程序。待程序启动后,我们在该终端中输入如下内容:
{"type":"gift","uid":"1001","vid":"29901","value":"100"}
{"type":"follow","uid":"1001","vid":"29901"}
{"type":"like","uid":"1001","vid":"29902"}
{"type":"like","uid":"1001","vid":"29901"}
{"type":"gift","uid":"1001","vid":"29901","value":"100"}
{"type":"like","uid":"1001","vid":"29904"}
(3)可以看到控制台输出如下内容:

全部评论(0)