Spark - Structured Streaming使用详解17(案例实操3:最近一小时广告点击量)
作者:hangge | 2024-02-15 09:28
十七、案例实操3:最近一小时广告点击量
1,需求说明
(1)实时统计各个广告最近一小时内各分钟的点击量,结果类似如下:
[1,WrappedArray([17:02,137], [17:03,212],........ [18:02,76])] [2,WrappedArray([17:02,244], [17:03,152],........ [18:02,34])] [3,WrappedArray([17:02,351], [17:03,242],........ [18:02,36])]
(2)该需求实现关键步骤如下:
- 设置 watermark 水印,仅保留 1 小时内的数据,并废弃 1 小时之前的数据。
- 使用 groupBy 操作按广告 ID、时间和窗口进行分组,然后使用 count 方法计算每个广告在每个时间窗口内的点击量。此时聚合后的结果如下:
- 经过上面处理的得到数据还不满足需求,我们需要在输出流中,使用 foreachBatch 方法对每个批次的数据进行进一步处理,按广告 ID 进行分组聚合,结果如下:

2,模拟生成广告点击数据
(1)在使用进行实时数据处理之前,我们需要写编写一个实时数据生成器。该生成器会自动地不断生成广告点击数据并推送到 Kafka 中。每条记录的内容格式如下:
时间戳 地区名称 城市名称 用户id 广告id
(2)下面是实际生成的数据样例:
1692273126289 华东 上海 4 2 1692273126289 华北 天津 2 1 1692273126289 华北 天津 2 3 1692273126289 华南 深圳 2 5 1692273126289 华南 深圳 1 5
(3)关于实时数据生成器的实现,可以参考我之前写的文章:
3,准备工作
我们项目需要添加Kafka相关依赖:
注意:这里引入了 spark-sql-kafka,所以项目之前如果引入过 kafka-clients 可以移除掉,避免版本不一致造成无法消费数据。
<!-- spark-sql-kafka依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.3.1</version>
</dependency>
4,编写业务代码
(1)首先我们定义一个 AdsInfo 样例类,用来封装从 Kafka 读取到广告点击信息:
import java.sql.Timestamp
case class AdsInfo(ts: Long, // 1694654067499
timestamp: Timestamp, // 2023-09-14 09:14:27.499
dayString: String, // 2023-09-14
hmString: String, // 09:14
area: String, // 华南
city: String, // 广州
userId: String, // 2
adsId: String) // 3
(2)最后则是主程序代码,它读取 Kafka 中的广告点击日志数据流,进行实时统计每个广告在不同时间窗口内的点击量,并将结果以批次的形式打印出来:
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{collect_list, struct, window}
object RealtimeApp {
def main(args: Array[String]): Unit = {
// 创建 SparkSession
val spark: SparkSession = SparkSession
.builder()
.appName("RealtimeApp")
.master("local[*]")
.getOrCreate()
// 导入隐式转换
import spark.implicits._
// 设置Spark的日志级别为"warn"
spark.sparkContext.setLogLevel("warn")
// 日期时间格式化器
val dayStringFormatter: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
val hmStringFormatter: SimpleDateFormat = new SimpleDateFormat("HH:mm")
// 创建一个流式DataFrame,这里从Kafka中读取数据
val lines: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.60.9:9092")
.option("subscribe", "my-topic")
.load
// 了方便后续处理, 封装数据到 AdsInfo 样例类中
val adsInfoDS: Dataset[AdsInfo] = lines.select("value")
.as[String]
.map(v => {
val split: Array[String] = v.split(" ")
val date: Date = new Date(split(0).toLong)
AdsInfo(split(0).toLong, new Timestamp(split(0).toLong),
dayStringFormatter.format(date), hmStringFormatter.format(date),
split(1), split(2), split(3), split(4))
})
.withWatermark("timestamp", "1 hours") // 只统计1小时数据, 对迟到1小时的数据废弃不用
// 广告点击量实时统计
val resultStream = adsInfoDS
.groupBy($"adsId", $"hmString", window($"timestamp", "1 minute"))
.count()
.orderBy($"hmString")
// 打印结果
val query = resultStream.writeStream
.outputMode("complete")
.foreachBatch{ (df: DataFrame, batchId: Long) => // 当前分区id, 当前批次id
println("\n\nbatchId:" + batchId)
// 按广告id进行分组聚合
val adToHmCountDF = df.groupBy("adsId")
.agg(collect_list(struct($"hmString", $"count")) as "minuteCount")
adToHmCountDF.show(false)
// 使用collect()方法将DataFrame中的数据收集到本地驱动器
val data = adToHmCountDF.collect()
// 打印数据
data.foreach(row => println(row))
}
.start()
// 等待应用程序终止
query.awaitTermination()
//关闭 Spark
spark.stop()
}
}
5,运行测试
(1)首先启动我们之前编写的数据生成器程序(点击查看),启动后可以看到程序每隔 2 秒便会产生一批数据:

(2)接着启动本文编写的广告点击量实时统计程序,可以看到控制台会不断打印出最近一小时的广告点击量:
全部评论(0)