返回 导航

Spark

hangge.com

Spark - Structured Streaming使用详解15(案例实操1:广告黑名单)

作者:hangge | 2024-02-09 09:23
    我在之前的文章中介绍了如何使用 Spark Streaming 来实现广告黑名单这个需求(点击查看),本文接着演示如何使用 Structured Streaming 来实现同样的功能。并且与之前将黑名单保存至 MySQL 数据库中不同,这次我们将其保存到 Redis 中。

十五、案例实操1:广告黑名单

1,需求说明

(1)我们需要实现实时的动态黑名单机制,即将每天对某个广告点击超过 100 次的用户拉黑:
  • 黑名单应该是每天更新一次. 如果昨天进入黑名单, 今天应该重新再统计
  • 把黑名单写入到 redis 中, 以供其他应用查看
  • 已经进入黑名单的用户不再进行检测(提高效率)
(2)实现思路:
  • 写入到黑名单redis 中每日的黑名单使用 setset 中的每个元素表示一个用户。通过 sql 查询过滤出来每天每广告点击数超过阈值的用户,然后使用 foreach 写入到 redis 即可。
  • 过滤黑名单的用户点击记录:先从 redis 读取到所有黑名单数据,然后过滤,只保留非黑名单用户的点击记录。
 

2,模拟生成广告点击数据

(1)在使用进行实时数据处理之前,我们需要写编写一个实时数据生成器。该生成器会自动地不断生成广告点击数据并推送到 Kafka 中。每条记录的内容格式如下:
时间戳 地区名称 城市名称 用户id 广告id

(2)下面是实际生成的数据样例:
1692273126289 华东 上海 4 2
1692273126289 华北 天津 2 1
1692273126289 华北 天津 2 3
1692273126289 华南 深圳 2 5
1692273126289 华南 深圳 1 5

(3)关于实时数据生成器的实现,可以参考我之前写的文章:

3,准备工作

(1)我们项目需要添加 KafkaRedis 相关依赖:
注意:这里引入了 spark-sql-kafka,所以项目之前如果引入过 kafka-clients 可以移除掉,避免版本不一致造成无法消费数据。
<!-- spark-sql-kafka依赖 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
    <version>3.3.1</version>
</dependency>
<!-- jedis依赖 -->
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.7.0</version>
</dependency>
(2)同时我们编写一个 RedisUtil 工具类,方便我们对 Redis 进行操作:
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}

object RedisUtil {
  private val jedisPoolConfig: JedisPoolConfig = new JedisPoolConfig()
  jedisPoolConfig.setMaxTotal(100) //最大连接数
  jedisPoolConfig.setMaxIdle(20) //最大空闲
  jedisPoolConfig.setMinIdle(20) //最小空闲
  jedisPoolConfig.setBlockWhenExhausted(true) //忙碌时是否等待
  jedisPoolConfig.setMaxWaitMillis(500) //忙碌时等待时长 毫秒
  jedisPoolConfig.setTestOnBorrow(true) //每次获得连接的进行测试

  private val jedisPool: JedisPool = new JedisPool(jedisPoolConfig, "192.168.60.9", 6379)

  // 直接得到一个 Redis 的连接
  def getJedisClient: Jedis = {
    jedisPool.getResource
  }
}

4,编写业务代码

(1)首先我们定义一个 AdsInfo 样例类,用来封装从 Kafka 读取到广告点击信息:
import java.sql.Timestamp

case class AdsInfo(ts: Long, // 1694654067499
                   timestamp: Timestamp, // 2023-09-14 09:14:27.499
                   dayString: String, // 2023-09-14
                   hmString: String, // 09:14
                   area: String, // 华南
                   city: String, // 广州
                   userId: String, // 2
                   adsId: String) // 3

(2)接着我们定义一个进行黑名单业务处理的工具类 BlackListHandler,主要涉及两个功能:添加黑名单和过滤黑名单。
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.Trigger
import redis.clients.jedis.Jedis

object BlackListHandler {
  def statBlackList(spark: SparkSession, adsInfoDS: Dataset[AdsInfo]) = {
    import spark.implicits._

    // 1. 过滤黑名单的数据: 如果有用户已经进入黑名单, 则不再统计这个用户的广告点击记录
    val filteredAdsInfoDS: Dataset[AdsInfo] = adsInfoDS.mapPartitions(adsInfoIt => {
      // 每个分区连接一次到redis读取黑名单, 然后把进入黑名单用户点击记录过滤掉
      val adsInfoList: List[AdsInfo] = adsInfoIt.toList
      if (adsInfoList.isEmpty) {
        adsInfoList.toIterator
      } else {
        // 先读取到黑名单
        val client: Jedis = RedisUtil.getJedisClient

        val blackList: java.util.Set[String] =
          client.smembers(s"day:blcklist:${adsInfoList(0).dayString}")

        // 过滤
        adsInfoList.filter(adsInfo => {
          !blackList.contains(adsInfo.userId)
        }).toIterator
      }

    })

    // 创建临时表: tb_ads_info
    filteredAdsInfoDS.createOrReplaceTempView("tb_ads_info")

    // 2. 黑名单记录
    // 按照每天每用户每id分组, 然后计数, 计数超过阈值(100)的查询出来
    val result: DataFrame = spark.sql(
      """
        |select
        | dayString,
        | userId
        |from tb_ads_info
        |group by dayString, userId, adsId
        |having count(1) >= 100
            """.stripMargin)

    // 3.把点击量超过 100 的写入到redis中.
    result.writeStream
      .outputMode("update")
      .trigger(Trigger.ProcessingTime("2 seconds"))
      .foreach(new ForeachWriter[Row] {
        var client: Jedis = _

        override def open(partitionId: Long, epochId: Long): Boolean = {
          // 打开到redis的连接
          client = RedisUtil.getJedisClient
          client != null
        }

        override def process(value: Row): Unit = {
          // 写入到redis  把每天的黑名单写入到set中  key: "day:blacklist" value: 黑名单用户
          val dayString: String = value.getString(0)
          val userId: String = value.getString(1)
          client.sadd(s"day:blcklist:$dayString", userId)
        }

        override def close(errorOrNull: Throwable): Unit = {
          // 关闭到redis的连接
          if (client != null) client.close()
        }
      })
      .option("checkpointLocation", "./blacklist")
      .start()
      .awaitTermination() // 等待应用程序终止

    // 4.把过滤后的数据返回(在其他地方也可以使用临时表: tb_ads_info)
    filteredAdsInfoDS
  }
}

(3)最后则是主程序代码,它读取 Kafka 中的广告点击日志数据流,对数据进行过滤、统计和黑名单处理:
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.sql._

object RealtimeApp {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark: SparkSession = SparkSession
      .builder()
      .appName("RealtimeApp")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 设置Spark的日志级别为"warn"
    spark.sparkContext.setLogLevel("warn")

    // 日期时间格式化器
    val dayStringFormatter: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
    val hmStringFormatter: SimpleDateFormat = new SimpleDateFormat("HH:mm")

    // 创建一个流式DataFrame,这里从Kafka中读取数据
    val lines: DataFrame = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "192.168.60.9:9092")
      .option("subscribe", "my-topic")
      .load

    // 了方便后续处理, 封装数据到 AdsInfo 样例类中
    val adsInfoDS: Dataset[AdsInfo] = lines.select("value")
      .as[String]
      .map(v => {
        val split: Array[String] = v.split(" ")
        val date: Date = new Date(split(0).toLong)
        AdsInfo(split(0).toLong, new Timestamp(split(0).toLong),
          dayStringFormatter.format(date), hmStringFormatter.format(date),
          split(1), split(2), split(3), split(4))
      })
      .withWatermark("timestamp", "24 hours") // 都是统计每天的数据, 对迟到24小时的数据废弃不用

    // 黑名单
    BlackListHandler.statBlackList(spark, adsInfoDS)

    //关闭 Spark
    spark.stop()
  }
}

5,运行测试

(1)首先启动我们之前编写的数据生成器程序(点击查看),启动后可以看到程序每隔 2 秒便会产生一批数据:

(2)稍等一会后,当某用户针对广告点击数到达 30 次时,该用户便会添加到 Redis 中:
评论

全部评论(0)

回到顶部