Flink - State状态详解4(KeyedState样例3:ListState实现订单数据补全)
作者:hangge | 2025-04-08 09:33
三、KeyedState 样例 3:使用 ListState 实现订单数据补全(双流 Join)
1,需求说明
大致需求是这样的,某外卖平台需要开发一个实时订单消息推送功能,当用户下单,并且成功支付后向商家推送一条消息。
2,实现逻辑
(1)由于下单数据是一个数据流,支付数据是另外一个数据流。
订单数据流:{"pid":"1001","pname":"n1"}
支付数据流:{"pid":"1001","pstatus":"success"}
(2)这个时候就需要对两个数据流进行关联了,当同一个订单相关的数据都到齐之后向外推送消息。针对这个需求,我们计划使用 ListState 实现。
(3)由于数据格式是 JSON 格式的,所以我们还需要引入 fastjson 依赖:
<!-- JSON 依赖 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
3,样例代码
(1)下面是 Scala 语言代码:
import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
object KeyedState_OrderDataDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val orderText = env.socketTextStream("192.168.121.128", 9998)
val payText = env.socketTextStream("192.168.121.128", 9999)
import org.apache.flink.api.scala._
//解析订单数据流
val orderTupleData = orderText.map(line => {
val orderJsonObj = JSON.parseObject(line)
val pid = orderJsonObj.getString("pid")
val pname = orderJsonObj.getString("pname")
(pid, pname)
})
//解析支付数据流
val payTupleData = payText.map(line => {
val payJsonObj = JSON.parseObject(line)
val pid = payJsonObj.getString("pid")
val pstatus = payJsonObj.getString("pstatus")
(pid, pstatus)
})
//针对两个流进行分组+connect连接(也可以先对两个流分别调用keyBy,再调用connect,效果一样)
orderTupleData.connect(payTupleData)
.keyBy("_1","_1")//field1表示第1个流里面的分组字段,field2表示第2个流里面的分组字段
.process(new KeyedCoProcessFunction[String,(String,String),(String,String)
,(String,String,String)] {
//声明两个ListState类型的状态变量,分别存储订单数据流和支付数据流
/**
* 注意:针对这个业务需求,pid在一个数据流中是不会重复的,其实使用ValueState也是可以的,
* 因为在这里已经通过keyBy基于pid对数据分组了,所以只需要在状态中存储pname或者pstatus即可。
* 但如果pid数据在一个数据流里会重复,那么就必须要使用ListState,这样才能存储指定pid多条数据
*/
private var orderDataState: ListState[(String,String)] = _
private var payDataState: ListState[(String,String)] = _
override def open(parameters: Configuration): Unit = {
//注册状态
val orderListStateDesc = new ListStateDescriptor[(String,String)](
"orderDataState",
classOf[(String, String)]
)
val payListStateDesc = new ListStateDescriptor[(String,String)](
"payDataState",
classOf[(String, String)]
)
orderDataState = getRuntimeContext.getListState(orderListStateDesc)
payDataState = getRuntimeContext.getListState(payListStateDesc)
}
//处理订单数据流
override def processElement1(orderTup: (String, String),
ctx: KeyedCoProcessFunction[String, (String, String),
(String, String), (String, String, String)]#Context,
out: Collector[(String, String, String)]): Unit = {
//获取当前pid对应的支付数据流,关联之后输出数据,(可能是支付数据先到)
payDataState.get().forEach(payTup=>{
out.collect((orderTup._1,orderTup._2,payTup._2))
})
//将本次接收到的订单数据添加到状态中,便于和支付数据流中的数据关联
orderDataState.add(orderTup)
}
//处理支付数据流
override def processElement2(payTup: (String, String),
ctx: KeyedCoProcessFunction[String, (String, String),
(String, String), (String, String, String)]#Context,
out: Collector[(String, String, String)]): Unit = {
//获取当前pid对应的订单数据流,关联之后输出数据,(可能是订单数据先到)
orderDataState.get().forEach(orderTup=>{
out.collect((orderTup._1,orderTup._2,payTup._2))
})
//将本次接收到的订单数据添加到状态中,便于和订单数据流中的数据关联
payDataState.add(payTup)
}
}).print()
env.execute("KeyedState_OrderDataDemo")
}
}
(2)下面是使用 Java 语言实现同样功能:
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;
public class KeyedStateOrderDataDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取订单和支付数据流
DataStream<String> orderText = env.socketTextStream("192.168.121.128", 9998);
DataStream<String> payText = env.socketTextStream("192.168.121.128", 9999);
// 解析订单数据流
DataStream<Tuple2<String, String>> orderTupleData = orderText
.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String s) throws Exception {
JSONObject orderJsonObj = JSON.parseObject(s);
String pid = orderJsonObj.getString("pid");
String pname = orderJsonObj.getString("pname");
return new Tuple2<>(pid, pname);
}
});
// 解析支付数据流
DataStream<Tuple2<String, String>> payTupleData = payText
.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String s) throws Exception {
JSONObject payJsonObj = JSON.parseObject(s);
String pid = payJsonObj.getString("pid");
String pstatus = payJsonObj.getString("pstatus");
return new Tuple2<>(pid, pstatus);
}
});
// 连接两个流并处理
orderTupleData.connect(payTupleData)
.keyBy(order -> order.f0, pay -> pay.f0) // 按 pid 分组
.process(new KeyedCoProcessFunction<String, Tuple2<String, String>,
Tuple2<String, String>, Tuple3<String, String, String>>() {
private transient ListState<Tuple2<String, String>> orderDataState;
private transient ListState<Tuple2<String, String>> payDataState;
@Override
public void open(Configuration parameters) {
// 注册状态
ListStateDescriptor<Tuple2<String, String>> orderListStateDesc =
new ListStateDescriptor<>("orderDataState",
Types.TUPLE(Types.STRING, Types.STRING));
ListStateDescriptor<Tuple2<String, String>> payListStateDesc =
new ListStateDescriptor<>("payDataState",
Types.TUPLE(Types.STRING, Types.STRING));
orderDataState = getRuntimeContext().getListState(orderListStateDesc);
payDataState = getRuntimeContext().getListState(payListStateDesc);
}
@Override
public void processElement1(Tuple2<String, String> orderTup,
Context ctx,
Collector<Tuple3<String, String, String>> out)
throws Exception {
// 处理订单数据流
for (Tuple2<String, String> payTup : payDataState.get()) {
out.collect(new Tuple3<>(orderTup.f0, orderTup.f1, payTup.f1));
}
orderDataState.add(orderTup);
}
@Override
public void processElement2(Tuple2<String, String> payTup,
Context ctx,
Collector<Tuple3<String, String, String>> out)
throws Exception {
// 处理支付数据流
for (Tuple2<String, String> orderTup : orderDataState.get()) {
out.collect(new Tuple3<>(orderTup.f0, orderTup.f1, payTup.f1));
}
payDataState.add(payTup);
}
}).print();
env.execute("KeyedState_OrderDataDemo");
}
}
4,运行测试
(1)我们首先打开两个终端,分别运行如下命令来启动两个 TCP socket:
nc -lk 9998 nc -lk 9999
(2)然后运行 Flink 程序。待程序启动后:
- 在 socket 9998 中输入如下内容模拟产生订单数据:
{"pid":"1001","pname":"n1"}
- 在 socket 9999 中输入如下内容模拟产生支付数据:
{"pid":"1003","pstatus":"success"}
- 由于此时两条数据不是同一个订单的,所以程序没有任何输出。
{"pid":"1003","pname":"n3"}
- 此时可以看到控制台输出一条数据:
(4)接下来继续在 socket 99999 中模拟产生支付数据,实现了订单数据流和支付数据流的数据关联。
{"pid":"1001","pstatus":"success"}
- 此时可以看到控制台输出一条数据:

附:样例存在问题和解决办法
1,存在的问题
(1)这个案例中两个数据流的数据会一直存储在状态中,随着时间的增长,状态会越来越多,状态如果是存储在内存中的话,内存可能就扛不住了,最终导致内存溢出。
(2)例如上面测试结束后,我们继续不断输入相同的支付数据:
{"pid":"1001","pstatus":"success"}
{"pid":"1001","pstatus":"success"}
{"pid":"1001","pstatus":"success"}
{"pid":"1001","pstatus":"success"}
- 可以看到控制台输出内容如下,说明两个数据流的数据是一直存在状态中,无论是否关联上。
2,两种解决方案
(1)一种方案是我们自己基于业务层面从状态中清理掉不用的数据,例如两份数据 join 到一起之后,就删除状态中的数据,这样可以保证状态中的数据不会一直无限递增。
(2)另一种方案是状态设置一个失效机制,官方提供的有一个 TTL 机制,可以给状态设置一个生存时间 ,过期自动删除,这个 TTL 机制我们后面再具体分析。
全部评论(0)