Spark - Structured Streaming使用详解15(案例实操1:广告黑名单)
作者:hangge | 2024-02-09 09:23
我在之前的文章中介绍了如何使用 Spark Streaming 来实现广告黑名单这个需求(点击查看),本文接着演示如何使用 Structured Streaming 来实现同样的功能。并且与之前将黑名单保存至 MySQL 数据库中不同,这次我们将其保存到 Redis 中。
十五、案例实操1:广告黑名单
1,需求说明
(1)我们需要实现实时的动态黑名单机制,即将每天对某个广告点击超过 100 次的用户拉黑:
- 黑名单应该是每天更新一次. 如果昨天进入黑名单, 今天应该重新再统计
- 把黑名单写入到 redis 中, 以供其他应用查看
- 已经进入黑名单的用户不再进行检测(提高效率)
(2)实现思路:
- 写入到黑名单:redis 中每日的黑名单使用 set,set 中的每个元素表示一个用户。通过 sql 查询过滤出来每天每广告点击数超过阈值的用户,然后使用 foreach 写入到 redis 即可。
- 过滤黑名单的用户点击记录:先从 redis 读取到所有黑名单数据,然后过滤,只保留非黑名单用户的点击记录。
2,模拟生成广告点击数据
(1)在使用进行实时数据处理之前,我们需要写编写一个实时数据生成器。该生成器会自动地不断生成广告点击数据并推送到 Kafka 中。每条记录的内容格式如下:
时间戳 地区名称 城市名称 用户id 广告id
(2)下面是实际生成的数据样例:
1692273126289 华东 上海 4 2 1692273126289 华北 天津 2 1 1692273126289 华北 天津 2 3 1692273126289 华南 深圳 2 5 1692273126289 华南 深圳 1 5
(3)关于实时数据生成器的实现,可以参考我之前写的文章:
3,准备工作
(1)我们项目需要添加 Kafka、Redis 相关依赖:
注意:这里引入了 spark-sql-kafka,所以项目之前如果引入过 kafka-clients 可以移除掉,避免版本不一致造成无法消费数据。
<!-- spark-sql-kafka依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.3.1</version>
</dependency>
<!-- jedis依赖 -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.7.0</version>
</dependency>
(2)同时我们编写一个 RedisUtil 工具类,方便我们对 Redis 进行操作:
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
object RedisUtil {
private val jedisPoolConfig: JedisPoolConfig = new JedisPoolConfig()
jedisPoolConfig.setMaxTotal(100) //最大连接数
jedisPoolConfig.setMaxIdle(20) //最大空闲
jedisPoolConfig.setMinIdle(20) //最小空闲
jedisPoolConfig.setBlockWhenExhausted(true) //忙碌时是否等待
jedisPoolConfig.setMaxWaitMillis(500) //忙碌时等待时长 毫秒
jedisPoolConfig.setTestOnBorrow(true) //每次获得连接的进行测试
private val jedisPool: JedisPool = new JedisPool(jedisPoolConfig, "192.168.60.9", 6379)
// 直接得到一个 Redis 的连接
def getJedisClient: Jedis = {
jedisPool.getResource
}
}
4,编写业务代码
(1)首先我们定义一个 AdsInfo 样例类,用来封装从 Kafka 读取到广告点击信息:
import java.sql.Timestamp
case class AdsInfo(ts: Long, // 1694654067499
timestamp: Timestamp, // 2023-09-14 09:14:27.499
dayString: String, // 2023-09-14
hmString: String, // 09:14
area: String, // 华南
city: String, // 广州
userId: String, // 2
adsId: String) // 3
(2)接着我们定义一个进行黑名单业务处理的工具类 BlackListHandler,主要涉及两个功能:添加黑名单和过滤黑名单。
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.Trigger
import redis.clients.jedis.Jedis
object BlackListHandler {
def statBlackList(spark: SparkSession, adsInfoDS: Dataset[AdsInfo]) = {
import spark.implicits._
// 1. 过滤黑名单的数据: 如果有用户已经进入黑名单, 则不再统计这个用户的广告点击记录
val filteredAdsInfoDS: Dataset[AdsInfo] = adsInfoDS.mapPartitions(adsInfoIt => {
// 每个分区连接一次到redis读取黑名单, 然后把进入黑名单用户点击记录过滤掉
val adsInfoList: List[AdsInfo] = adsInfoIt.toList
if (adsInfoList.isEmpty) {
adsInfoList.toIterator
} else {
// 先读取到黑名单
val client: Jedis = RedisUtil.getJedisClient
val blackList: java.util.Set[String] =
client.smembers(s"day:blcklist:${adsInfoList(0).dayString}")
// 过滤
adsInfoList.filter(adsInfo => {
!blackList.contains(adsInfo.userId)
}).toIterator
}
})
// 创建临时表: tb_ads_info
filteredAdsInfoDS.createOrReplaceTempView("tb_ads_info")
// 2. 黑名单记录
// 按照每天每用户每id分组, 然后计数, 计数超过阈值(100)的查询出来
val result: DataFrame = spark.sql(
"""
|select
| dayString,
| userId
|from tb_ads_info
|group by dayString, userId, adsId
|having count(1) >= 100
""".stripMargin)
// 3.把点击量超过 100 的写入到redis中.
result.writeStream
.outputMode("update")
.trigger(Trigger.ProcessingTime("2 seconds"))
.foreach(new ForeachWriter[Row] {
var client: Jedis = _
override def open(partitionId: Long, epochId: Long): Boolean = {
// 打开到redis的连接
client = RedisUtil.getJedisClient
client != null
}
override def process(value: Row): Unit = {
// 写入到redis 把每天的黑名单写入到set中 key: "day:blacklist" value: 黑名单用户
val dayString: String = value.getString(0)
val userId: String = value.getString(1)
client.sadd(s"day:blcklist:$dayString", userId)
}
override def close(errorOrNull: Throwable): Unit = {
// 关闭到redis的连接
if (client != null) client.close()
}
})
.option("checkpointLocation", "./blacklist")
.start()
.awaitTermination() // 等待应用程序终止
// 4.把过滤后的数据返回(在其他地方也可以使用临时表: tb_ads_info)
filteredAdsInfoDS
}
}
(3)最后则是主程序代码,它读取 Kafka 中的广告点击日志数据流,对数据进行过滤、统计和黑名单处理:
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.sql._
object RealtimeApp {
def main(args: Array[String]): Unit = {
// 创建 SparkSession
val spark: SparkSession = SparkSession
.builder()
.appName("RealtimeApp")
.master("local[*]")
.getOrCreate()
// 导入隐式转换
import spark.implicits._
// 设置Spark的日志级别为"warn"
spark.sparkContext.setLogLevel("warn")
// 日期时间格式化器
val dayStringFormatter: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
val hmStringFormatter: SimpleDateFormat = new SimpleDateFormat("HH:mm")
// 创建一个流式DataFrame,这里从Kafka中读取数据
val lines: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.60.9:9092")
.option("subscribe", "my-topic")
.load
// 了方便后续处理, 封装数据到 AdsInfo 样例类中
val adsInfoDS: Dataset[AdsInfo] = lines.select("value")
.as[String]
.map(v => {
val split: Array[String] = v.split(" ")
val date: Date = new Date(split(0).toLong)
AdsInfo(split(0).toLong, new Timestamp(split(0).toLong),
dayStringFormatter.format(date), hmStringFormatter.format(date),
split(1), split(2), split(3), split(4))
})
.withWatermark("timestamp", "24 hours") // 都是统计每天的数据, 对迟到24小时的数据废弃不用
// 黑名单
BlackListHandler.statBlackList(spark, adsInfoDS)
//关闭 Spark
spark.stop()
}
}
5,运行测试
(1)首先启动我们之前编写的数据生成器程序(点击查看),启动后可以看到程序每隔 2 秒便会产生一批数据:

(2)稍等一会后,当某用户针对广告点击数到达 30 次时,该用户便会添加到 Redis 中:

全部评论(0)