Flink - 双十一总金额、TopN商品统计案例实操(Kafka数据实时计算、输出至Redis)
作者:hangge | 2025-03-14 08:47
一、功能说明
1,整体架构介绍
(1)下面是一个某电商网站数据大屏整体架构:
- 当用户在 PC 或 App 上提交一个订单后,业务系统会通过日志的方式记录这条订单的相关数据。
- 使用 Filebeat 这个日志采集工具采集前端业务机器上的日志数据。这里采集的日志数据其实就是用户的订单数据。
- 通过 Filebeat 将订单数据采集到 Kafka 中进行缓冲,避免数据峰值给后端的计算系统造成太大的压力。
- 使用 Flink 开发实时计算程序对 Kafka 中的数据进行处理,Flink 内部涉及对接 Kafka 数据源、数据解析、数据过滤及数据聚合之类的功能,在计算出需要的结果数据后将结果数据输出到 Redis 中。
- 通过 DataV 查询 Redis 中的数据并进行展示。在对接时会用到数据接口,需要提前封装好数据接口,在对接时直接在 DataV 中调用即可。

(2)本文样例主要实现 Flink 实时数据计算部分的核心代码,即对 Kafka 中的数据进行处理,并将计算出的结果数据输出到 Redis 中。前面的日志数据采集和后面的数据展示模块暂不实现。
2,需求描述
(1)“双十一”数据大屏中展现的指标和图表有近 20 个,这里主要计算两个核心指标。
- 第 1 个指标:统计全站“双十一”当天的实时 GMV(总的成交金额)。
- 第 2 个指标:统计实时销量 Top N 的商品品类。
(2)在此项目中需要用到的订单数据的格式如下:
- 这是一个 JSON 格式的数据,表示是用户的一个订单数据信息,其中包含订单编号、订单总金额和订单明细。
- 订单明细中存储了订单中商品的基本信息(包括商品编号、商品数量、商品单价、商品名称和商品品类)。
{
"detal": [
{
"goodsCount": 1,
"goodsName": "美的 606 升",
"goodsNo": "6002",
"goodsPrice": 100,
"goodsType": "冰箱"
},
{
"goodsCount": 2,
"goodsName": "荣耀 30Pro",
"goodsNo": "1005",
"goodsPrice": 200,
"goodsType": "手机"
}
],
"orderNo": "9fc3ffe3-3255-414e-8df8-1580cfbd2ff9",
"totalPrice": 500
}
二、程序开发(Java 版)
1,添加依赖
我们创建一个 Maven 项目,然后在 pom.xml 文件中添加 Flink、Redis、JSON 依赖。
<!-- Flink 依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.16.0</version>
</dependency>
<!-- Redis 依赖 -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<!-- JSON 依赖 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
2,自定义 Sink 代码
提示:由于 Flink 内置的 RedisSink 组件无法满足需求,所以这里自定义了 RedisSink 组件。
(1)下面是用户输出实时总成交金额的自定义 Sink 代码:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import redis.clients.jedis.Jedis;
public class GmvRedisSink extends RichSinkFunction<Long> {
private String host;
private int port;
private String password;
private String key;
private Jedis jedis = null;
public GmvRedisSink(String host, int port, String password, String key) {
this.host = host;
this.port = port;
this.password = password;
this.key = key;
}
/**
* 初始化方法,只执行一次
*
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
this.jedis = new Jedis(host, port);
if (password!= null &&!password.isEmpty()) {
jedis.auth(password);
}
}
/**
* 核心代码,来一条数据此方法会执行一次
*
* @param value
* @param context
* @throws Exception
*/
@Override
public void invoke(Long value, Context context) throws Exception {
//对 GMV 数据进行递增操作
jedis.incrBy(key, value);
}
/**
* 任务停止时会先调用此方法
* 适合关闭资源链接
*
* @throws Exception
*/
@Override
public void close() throws Exception {
//关闭链接
if (jedis != null) {
jedis.close();
}
}
}
(2)下面是用户输出实时销量 Top N 的商品品类的自定义 Sink 代码:
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import redis.clients.jedis.Jedis;
public class TopNRedisSink extends RichSinkFunction<Tuple2<String, Long>> {
private String host;
private int port;
private String password;
private String key;
private Jedis jedis = null;
public TopNRedisSink(String host, int port, String password, String key) {
this.host = host;
this.port = port;
this.password = password;
this.key = key;
}
/**
* 初始化方法,只执行一次
*
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
this.jedis = new Jedis(host, port);
if (password != null &&!password.isEmpty()) {
jedis.auth(password);
}
}
/**
* 核心代码,来一条数据此方法会执行一次
*
* @param value
* @param context
* @throws Exception
*/
@Override
public void invoke(Tuple2<String, Long> value, Context context) throws Exception {
//给 sortedset 中的指定元素递增添加分值
jedis.zincrby(key, value.f1, value.f0);
}
/**
* 任务停止时会先调用此方法
* 适合关闭资源链接
*
* @throws Exception
*/
@Override
public void close() throws Exception {
//关闭链接
if (jedis != null) {
jedis.close();
}
}
}
3,Flinl 核心程序代码
代码具体流程是:
- 从 Kafka 中读取订单数据,使用 FlatMap 将订单中的商品信息提取并转换为 Tuple3。
- 过滤掉异常数据(商品数量小于等于 0)。
- 对订单数据进行两项实时统计:GMV 统计和实时销量 Top N 的商品品类统计。
- 将 GMV 和 Top N 的统计结果写入 Redis 中。
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class StreamDataCalcJava {
public static void main(String[] args) throws Exception {
//获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//指定 KafkaSource 相关配置
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("192.168.121.128:9092")
.setTopics("order_detail")
.setGroupId("con")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
//指定 Kafka 作为 Source
DataStreamSource<String> text = env.fromSource(kafkaSource,
WatermarkStrategy.noWatermarks(), "Kafka Source");
//解析订单数据,只保留需要用到的字段
//goodsCount、goodsPrice、goodsType
SingleOutputStreamOperator<Tuple3<Long, Long, String>> orderStream =
text.flatMap(new FlatMapFunction<String, Tuple3<Long, Long, String>>() {
public void flatMap(String line, Collector<Tuple3<Long, Long, String>> out)
throws Exception {
JSONObject orderJson = JSON.parseObject(line);
//获取 JSON 数据中的商品明细
JSONArray orderDetail = orderJson.getJSONArray("detal");
for (int i = 0; i < orderDetail.size(); i++) {
JSONObject orderObj = orderDetail.getJSONObject(i);
long goodsCount = orderObj.getLongValue("goodsCount");
long goodsPrice = orderObj.getLongValue("goodsPrice");
String goodsType = orderObj.getString("goodsType");
out.collect(new Tuple3<Long, Long, String>(goodsCount, goodsPrice,
goodsType));
}
}
});
//过滤异常数据
SingleOutputStreamOperator<Tuple3<Long, Long, String>> filterStream =
orderStream.filter(new FilterFunction<Tuple3<Long, Long, String>>() {
public boolean filter(Tuple3<Long, Long, String> tup) throws Exception {
//商品数量大于 0 的数据才是有效数据
return tup.f0 > 0;
}
});
//1.统计全站“双十一”当天的实时 GMV
SingleOutputStreamOperator<Long> gmvStream = filterStream
.map(new MapFunction<Tuple3<Long, Long, String>, Long>() {
public Long map(Tuple3<Long, Long, String> tup) throws Exception {
//计算单个商品的消费金额
return tup.f0 * tup.f1;
}
});
//将 GMV 数据保存到 Redis 中
gmvStream.addSink(new GmvRedisSink("192.168.121.128", 6379, "123","gmv"));
//2.统计实时销量 Top N 的商品品类
SingleOutputStreamOperator<Tuple2<String, Long>> topNStream =
filterStream.map(new MapFunction<Tuple3<Long, Long, String>,
Tuple2<String, Long>>() {
//获取商品品类和购买的商品数量
public Tuple2<String, Long> map(Tuple3<Long, Long, String> tup)
throws Exception {
return new Tuple2<String, Long>(tup.f2, tup.f0);
}
});
//根据商品品类分组
KeyedStream<Tuple2<String, Long>, String> keyStream = topNStream.keyBy(tuple -> tuple.f0);
//设置时间窗口为 1 s
WindowedStream<Tuple2<String, Long>, String, TimeWindow> windowStream = keyStream
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)));
//求和,指定 tuple 中的第 2 列,即商品数量
SingleOutputStreamOperator<Tuple2<String, Long>> resStream = windowStream.sum(1);
//将 goods_type 数据保存到 Redis 中
resStream.addSink(new TopNRedisSink("192.168.121.128", 6379, "123","goods_type"));
//执行任务
env.execute("StreamDataCalcJava");
}
}
三、运行测试
(1)首先我们创建好 Kafka 的 topic。然后运行我们编写的 Flink 程序。
kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 5 --replication-factor 1 --topic order_detail
(2)接着创建一个该主题的消息生产者:
kafka-console-producer.sh --broker-list localhost:9092 --topic order_detail
(3)然后发送如下两条测试数据:
(4)Flink 程序运行一段时间,并且成功计算了一批数据后,在 Redis 中可以看到相关的结果。
(2)下面是用户输出实时销量 Top N 的商品品类的自定义 Sink 代码:
{
"detal": [
{
"goodsCount": 1,
"goodsName": "美的 606 升",
"goodsNo": "6002",
"goodsPrice": 100,
"goodsType": "冰箱"
},
{
"goodsCount": 2,
"goodsName": "荣耀 30Pro",
"goodsNo": "1005",
"goodsPrice": 200,
"goodsType": "手机"
}
],
"orderNo": "9fc3ffe3-3255-414e-8df8-1580cfbd2ff9",
"totalPrice": 500
}
{
"detal": [
{
"goodsCount": 2,
"goodsName": "联想 Y50",
"goodsNo": "8001",
"goodsPrice": 300,
"goodsType": "电脑"
},
{
"goodsCount": 1,
"goodsName": "华为 Mate60",
"goodsNo": "1006",
"goodsPrice": 200,
"goodsType": "手机"
}
],
"orderNo": "414effe3-3255-0cfb-8df8-1580cfb255f9",
"totalPrice": 800
}
(4)Flink 程序运行一段时间,并且成功计算了一批数据后,在 Redis 中可以看到相关的结果。
- 下面是查看实时总成交金额:
get gmv
- 下面是查看实时销量 Top N 的商品品类
zrevrange goods_type 0 -1 withscores

附一:Scala 版程序代码
1,自定义 Sink 代码
提示:由于 Flink 内置的 RedisSink 组件无法满足需求,所以这里自定义了 RedisSink 组件。
(1)下面是用户输出实时总成交金额的自定义 Sink 代码:
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import redis.clients.jedis.Jedis
class GmvRedisSink extends RichSinkFunction[Long] {
var host: String = _
var port: Int = _
var key: String = _
var password: String = _
var jedis: Jedis = _
/**
* 构造函数
*
* @param host
* @param port
* @param key
* @param password
*/
def this(host: String, port: Int, password: String, key: String) {
this()
this.host = host
this.port = port
this.password = password
this.key = key
}
/**
* 初始化方法,只执行一次
* 适合初始化资源链接
*
* @param parameters
*/
override def open(parameters: Configuration): Unit = {
this.jedis = new Jedis(host, port)
// 如果设置了密码,进行认证
if (this.password != null && this.password.nonEmpty) {
this.jedis.auth(this.password)
}
}
/**
* 核心代码,来一条数据此方法会执行一次
*
* @param value
* @param context
*/
override def invoke(in: Long): Unit = {
jedis.incrBy(key, in)
}
/**
* 任务停止时会先调用此方法
* 适合关闭资源链接
*/
override def close(): Unit = {
//关闭链接
if (jedis != null) {
jedis.close()
}
}
}
(2)下面是用户输出实时销量 Top N 的商品品类的自定义 Sink 代码:
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import redis.clients.jedis.Jedis
class TopNRedisSink extends RichSinkFunction[(String, Long)] {
var host: String = _
var port: Int = _
var key: String = _
var password: String = _
var jedis: Jedis = _
/**
* 构造函数
*
* @param host
* @param port
* @param key
* @param password
*/
def this(host: String, port: Int, password: String, key: String) {
this()
this.host = host
this.port = port
this.password = password
this.key = key
}
/**
* 初始化方法,只执行一次
* 适合初始化资源链接
*
* @param parameters
*/
override def open(parameters: Configuration): Unit = {
this.jedis = new Jedis(host, port)
// 如果设置了密码,进行认证
if (this.password != null && this.password.nonEmpty) {
this.jedis.auth(password)
}
}
/**
* 核心代码,来一条数据此方法会执行一次
*
* @param value
* @param context
*/
override def invoke(in: (String, Long)): Unit = {
jedis.zincrby(key, in._2, in._1)
}
/**
* 任务停止时会先调用此方法
* 适合关闭资源链接
*/
override def close(): Unit = {
//关闭链接
if (jedis != null) {
jedis.close()
}
}
}2,Flinl 核心程序代码
代码具体流程是:
- 从 Kafka 中读取订单数据,使用 FlatMap 将订单中的商品信息提取并转换为 Tuple3。
- 过滤掉异常数据(商品数量小于等于 0)。
- 对订单数据进行两项实时统计:GMV 统计和实时销量 Top N 的商品品类统计。
- 将 GMV 和 Top N 的统计结果写入 Redis 中。
import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object StreamDataCalcScala {
def main(args: Array[String]): Unit = {
//获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
// 指定 KafkaSource 相关配置
val kafkaSource = KafkaSource.builder[String]
.setBootstrapServers("192.168.121.128:9092")
.setTopics("order_detail")
.setGroupId("con")
.setStartingOffsets(OffsetsInitializer.earliest)
.setValueOnlyDeserializer(new SimpleStringSchema())
.build
//指定 Kafka 作为 Source
val text = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")
//解析订单数据,将数据打平,只保留需要用到的字段
//goodsCount、goodsPrice、goodsType
val orderStream = text.flatMap(line=>{
val orderJson = JSON.parseObject(line)
val orderDetal = orderJson.getJSONArray("detal")
val res = new Array[(Long,Long,String)](orderDetal.size())
for(i <- 0 until orderDetal.size()){
val orderObj = orderDetal.getJSONObject(i)
val goodsCount = orderObj.getLongValue("goodsCount")
val goodsPrice = orderObj.getLongValue("goodsPrice")
val goodsType = orderObj.getString("goodsType")
res(i) = (goodsCount,goodsPrice,goodsType)
}
res
})
//过滤异常数据
val filterStreram = orderStream.filter(_._1 > 0)
//1. 统计全站“双十一”当天的实时 GMV
val gmvStream = filterStreram.map(tup=>tup._1 * tup._2) //计算单个商品的消费金额
//将 GMV 数据保存到 Redis 中
gmvStream.addSink(new GmvRedisSink("192.168.121.128",6379,"123","gmv"))
//2.统计实时销量 Top N 的商品品类
val topNStream = filterStreram.map(tup=>(tup._3,tup._1)) //获取商品品类和购买的商品数量
.keyBy(tup=>tup._1) //根据商品品类分组
.window(TumblingProcessingTimeWindows.of(Time.seconds(1))) //设置时间窗口为 1 s
.sum(1) //指定 tuple 中的第 2 列,即商品数量
//将 goods_type 数据保存到 Redis 中
topNStream.addSink(new TopNRedisSink("192.168.121.128",6379,"123","goods_type"))
//执行任务
env.execute("StreamDataCalcScala")
}
}
附二:自动提交 offset,防止 Kafka 消息重复消费
1,问题描述
上面样例程序我们运行一段时间后,重新启动时会发现程序又会从 Kafka 的第一条消息开始重新消费一遍,这样就造成 Redis 里面数据重复累计。
2,解决办法
要让程序重启后从上一次位置继续消费,我们将 KafkaSource 配置部分做如下修改,启用自动提交消费位移(offset)的功能,并将消费者的起始位移被设置为已提交的最早位移。
注意:OffsetResetStrategy.EARLIEST 表示如果没有先前的消费位移(例如,新的消费者组),则从最早的可用消息开始消费。这确保了即使是新的消费者组,也能从消息队列的开头开始消费。
// 指定 KafkaSource 相关配置
val kafkaSource = KafkaSource.builder[String]
.setBootstrapServers("192.168.121.128:9092")
.setTopics("order_detail")
.setGroupId("con")
.setProperty("enable.auto.commit","true") // 启用自动提交消费位移(offset)的功能
.setProperty("auto.commit.interval.ms","1000") // 设置自动提交消费位移的时间间隔为1秒
// 消费者的起始位移被设置为已提交的最早位移
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
//.setStartingOffsets(OffsetsInitializer.earliest)
.setValueOnlyDeserializer(new SimpleStringSchema())
.build
//指定 Kafka 作为 Source
val text = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")
全部评论(0)