返回 导航

Spark

hangge.com

Spark - Structured Streaming使用详解16(案例实操2:广告点击量实时统计)

作者:hangge | 2024-02-13 10:22
    针对广告点击量实时统计这个需求之前我介绍过如何使用 Spark Streaming 来实现(点击查看),本文接着演示如何使用 Structured Streaming 来实现同样的功能。并且与之前将点击量保存至 MySQL 数据库中不同,这次我们将其保存到 Redis 中。

十六、案例实操2:广告点击量实时统计

1,需求说明

实时统计每天各地区各城市各广告的点击总流量,并将其存入 Redis
  • key 为“date:area:city:ads
  • valuehash 类型。比如其中:field 为“2023-09-14:华南:深圳:5”,对应的 value10000
 

2,模拟生成广告点击数据

(1)在使用进行实时数据处理之前,我们需要写编写一个实时数据生成器。该生成器会自动地不断生成广告点击数据并推送到 Kafka 中。每条记录的内容格式如下:
时间戳 地区名称 城市名称 用户id 广告id

(2)下面是实际生成的数据样例:
1692273126289 华东 上海 4 2
1692273126289 华北 天津 2 1
1692273126289 华北 天津 2 3
1692273126289 华南 深圳 2 5
1692273126289 华南 深圳 1 5

(3)关于实时数据生成器的实现,可以参考我之前写的文章:

3,准备工作

(1)我们项目需要添加 KafkaRedis 相关依赖:
注意:这里引入了 spark-sql-kafka,所以项目之前如果引入过 kafka-clients 可以移除掉,避免版本不一致造成无法消费数据。
<!-- spark-sql-kafka依赖 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
    <version>3.3.1</version>
</dependency>
<!-- jedis依赖 -->
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.7.0</version>
</dependency>

(2)同时我们编写一个 RedisUtil 工具类,方便我们对 Redis 进行操作:
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}

object RedisUtil {
  private val jedisPoolConfig: JedisPoolConfig = new JedisPoolConfig()
  jedisPoolConfig.setMaxTotal(100) //最大连接数
  jedisPoolConfig.setMaxIdle(20) //最大空闲
  jedisPoolConfig.setMinIdle(20) //最小空闲
  jedisPoolConfig.setBlockWhenExhausted(true) //忙碌时是否等待
  jedisPoolConfig.setMaxWaitMillis(500) //忙碌时等待时长 毫秒
  jedisPoolConfig.setTestOnBorrow(true) //每次获得连接的进行测试

  private val jedisPool: JedisPool = new JedisPool(jedisPoolConfig, 
    "192.168.60.9", 6379, 1000, "123456")

  // 直接得到一个 Redis 的连接
  def getJedisClient: Jedis = {
    jedisPool.getResource
  }
}

4,编写业务代码

(1)首先我们定义一个 AdsInfo 样例类,用来封装从 Kafka 读取到广告点击信息:
import java.sql.Timestamp

case class AdsInfo(ts: Long, // 1694654067499
                   timestamp: Timestamp, // 2023-09-14 09:14:27.499
                   dayString: String, // 2023-09-14
                   hmString: String, // 09:14
                   area: String, // 华南
                   city: String, // 广州
                   userId: String, // 2
                   adsId: String) // 3

(2)接着我们定义一个用于处理广告点击数据的工具类 AdsClickCountHandler,主要涉及两个功能:统计每天各大区各个城市广告点击总数并保存至 Redis 中。
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import redis.clients.jedis.Jedis

object AdsClickCountHandler {
  val key: String = "date:area:city:ads"

  // 广告点击量实时统计
  def statAdsClickCount(spark: SparkSession, filteredAdsInfoDS: Dataset[AdsInfo]) = {
    // 导入隐式转换
    import spark.implicits._

    filteredAdsInfoDS
      .groupBy($"dayString", $"area", $"city", $"adsId")
      .count()
      .writeStream
      .outputMode("update")
      .foreachBatch{(df: DataFrame, batchId: Long) =>   // 当前分区id, 当前批次ids
        if (df.count() > 0) {
          df.cache() // 做缓存防止重复调用
          df.foreachPartition((rowIt: Iterator[Row]) => {
            val client: Jedis = RedisUtil.getJedisClient
            // 1. 把数据存入到map中, 向redis写入的时候比较方便
            val fieldValueMap: Map[String, String] = rowIt.map(row => {
              // 2023-09-14:华南:深圳:5
              val field: String =
                s"${row.getString(0)}:${row.getString(1)}:${row.getString(2)}:${row.getString(3)}"
              val value: Long = row.getLong(4)
              (field, value.toString)
            }).toMap
            // 2. 写入到redis
            // 用于把scala的集合转换成java的集合
            import scala.collection.JavaConversions._
            if (fieldValueMap.nonEmpty) client.hmset(key, fieldValueMap)
            client.close()
          })
        }
      }
      .start()
      .awaitTermination()  // 等待应用程序终止
  }
}

(3)最后则是主程序代码,它读取 Kafka 中的广告点击日志数据流,对数据进行处理、统计、入库:
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.{Date}
import org.apache.spark.sql._

object RealtimeApp {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark: SparkSession = SparkSession
      .builder()
      .appName("RealtimeApp")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 设置Spark的日志级别为"warn"
    spark.sparkContext.setLogLevel("warn")

    // 日期时间格式化器
    val dayStringFormatter: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
    val hmStringFormatter: SimpleDateFormat = new SimpleDateFormat("HH:mm")

    // 创建一个流式DataFrame,这里从Kafka中读取数据
    val lines: DataFrame = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "192.168.60.9:9092")
      .option("subscribe", "my-topic")
      .load

    // 了方便后续处理, 封装数据到 AdsInfo 样例类中
    val adsInfoDS: Dataset[AdsInfo] = lines.select("value")
      .as[String]
      .map(v => {
        val split: Array[String] = v.split(" ")
        val date: Date = new Date(split(0).toLong)
        AdsInfo(split(0).toLong, new Timestamp(split(0).toLong),
          dayStringFormatter.format(date), hmStringFormatter.format(date),
          split(1), split(2), split(3), split(4))
      })
      .withWatermark("timestamp", "24 hours") // 都是统计每天的数据, 对迟到24小时的数据废弃不用

    // 广告点击量实时统计
    AdsClickCountHandler.statAdsClickCount(spark, adsInfoDS)

    //关闭 Spark
    spark.stop()
  }
}

5,运行测试

(1)首先启动我们之前编写的数据生成器程序(点击查看),启动后可以看到程序每隔 2 秒便会产生一批数据:

(2)接着启动本文编写的广告点击量实时统计程序,访问 Redis 可以看到统计数据在不断的更新:

附:广告黑名单 + 广告点击量实时统计

1,需求说明

    本文样例中的广告点击量实时统计功能是没有考虑黑名单用户的。下面我们做个功能改进,需要与前文的黑名单功能(点击查看)结合起来,即被拉黑的用户点击不计入点击量实时统计数据。

2,样例代码

(1)要实现这个功能需求,首先对前文的黑名单过滤统计工具类 BlackListHandler 中的 awaitTermination() 方法去掉,否则程序启动后至会进行黑名单统计,并不会执行后续的广告点击统计:
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.Trigger
import redis.clients.jedis.Jedis

object BlackListHandler {
  def statBlackList(spark: SparkSession, adsInfoDS: Dataset[AdsInfo]) = {
    import spark.implicits._

    // 1. 过滤黑名单的数据: 如果有用户已经进入黑名单, 则不再统计这个用户的广告点击记录
    val filteredAdsInfoDS: Dataset[AdsInfo] = adsInfoDS.mapPartitions(adsInfoIt => {
      // 每个分区连接一次到redis读取黑名单, 然后把进入黑名单用户点击记录过滤掉
      val adsInfoList: List[AdsInfo] = adsInfoIt.toList
      if (adsInfoList.isEmpty) {
        adsInfoList.toIterator
      } else {
        // 先读取到黑名单
        val client: Jedis = RedisUtil.getJedisClient

        val blackList: java.util.Set[String] =
          client.smembers(s"day:blcklist:${adsInfoList(0).dayString}")

        // 过滤
        adsInfoList.filter(adsInfo => {
          !blackList.contains(adsInfo.userId)
        }).toIterator
      }

    })

    // 创建临时表: tb_ads_info
    filteredAdsInfoDS.createOrReplaceTempView("tb_ads_info")

    // 2. 黑名单记录
    // 按照每天每用户每id分组, 然后计数, 计数超过阈值(100)的查询出来
    val result: DataFrame = spark.sql(
      """
        |select
        | dayString,
        | userId
        |from tb_ads_info
        |group by dayString, userId, adsId
        |having count(1) >= 50
            """.stripMargin)

    // 3.把点击量超过 100 的写入到redis中.
    result.writeStream
      .outputMode("update")
      .trigger(Trigger.ProcessingTime("2 seconds"))
      .foreach(new ForeachWriter[Row] {
        var client: Jedis = _

        override def open(partitionId: Long, epochId: Long): Boolean = {
          // 打开到redis的连接
          client = RedisUtil.getJedisClient
          client != null
        }

        override def process(value: Row): Unit = {
          // 写入到redis  把每天的黑名单写入到set中  key: "day:blacklist" value: 黑名单用户
          val dayString: String = value.getString(0)
          val userId: String = value.getString(1)
          client.sadd(s"day:blcklist:$dayString", userId)
        }

        override def close(errorOrNull: Throwable): Unit = {
          // 关闭到redis的连接
          if (client != null) client.close()
        }
      })
      .option("checkpointLocation", "./blacklist")
      .start()
      // .awaitTermination() // 等待应用程序终止

    // 4.把过滤后的数据返回(在其他地方也可以使用临时表: tb_ads_info)
    filteredAdsInfoDS
  }
}

(2)主程序代码这边只要在对 Dataset 进行广告量实时统计之前,先经过黑名单数据流处理即可,这样即可对过滤后的点击进行统计。
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.{Date}
import org.apache.spark.sql._

object RealtimeApp {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark: SparkSession = SparkSession
      .builder()
      .appName("RealtimeApp")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 设置Spark的日志级别为"warn"
    spark.sparkContext.setLogLevel("warn")

    // 日期时间格式化器
    val dayStringFormatter: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
    val hmStringFormatter: SimpleDateFormat = new SimpleDateFormat("HH:mm")

    // 创建一个流式DataFrame,这里从Kafka中读取数据
    val lines: DataFrame = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "192.168.60.9:9092")
      .option("subscribe", "my-topic")
      .load

    // 了方便后续处理, 封装数据到 AdsInfo 样例类中
    val adsInfoDS: Dataset[AdsInfo] = lines.select("value")
      .as[String]
      .map(v => {
        val split: Array[String] = v.split(" ")
        val date: Date = new Date(split(0).toLong)
        AdsInfo(split(0).toLong, new Timestamp(split(0).toLong),
          dayStringFormatter.format(date), hmStringFormatter.format(date),
          split(1), split(2), split(3), split(4))
      })
      .withWatermark("timestamp", "24 hours") // 都是统计每天的数据, 对迟到24小时的数据废弃不用

    // 黑名单
    val filteredAdsInfoDS: Dataset[AdsInfo] = BlackListHandler.statBlackList(spark, adsInfoDS)

    // 广告点击量实时统计
    AdsClickCountHandler.statAdsClickCount(spark, filteredAdsInfoDS)

    //关闭 Spark
    spark.stop()
  }
}

(3)同时由于前面黑名单处理时就已经对过滤后的点击记录创建了临时表 tb_ads_info,因此我们的广告点击统计工具类中也可以直接查询 tb_ads_info 进行统计,而不使用传入的 Dataset 进行统计(当然不改动也是可以的):
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import redis.clients.jedis.Jedis

object AdsClickCountHandler {
  val key: String = "date:area:city:ads"

  // 广告点击量实时统计
  def statAdsClickCount(spark: SparkSession, filteredAdsInfoDS: Dataset[AdsInfo]) = {
    // 导入隐式转换
    import spark.implicits._

    spark.sql(
      s"""
         |select
         |    dayString,
         |    area,
         |    city,
         |    adsId,
         |    count(1) count
         |from tb_ads_info
         |group by dayString, area, city, adsId
      """.stripMargin)
      .writeStream
      .outputMode("update")
      .foreachBatch{(df: DataFrame, batchId: Long) =>   // 当前分区id, 当前批次ids
        if (df.count() > 0) {
          df.cache() // 做缓存防止重复调用
          df.foreachPartition((rowIt: Iterator[Row]) => {
            val client: Jedis = RedisUtil.getJedisClient
            // 1. 把数据存入到map中, 向redis写入的时候比较方便
            val fieldValueMap: Map[String, String] = rowIt.map(row => {
              // 2023-09-14:华南:深圳:5
              val field: String =
                s"${row.getString(0)}:${row.getString(1)}:${row.getString(2)}:${row.getString(3)}"
              val value: Long = row.getLong(4)
              (field, value.toString)
            }).toMap
            // 2. 写入到redis
            // 用于把scala的集合转换成java的集合
            import scala.collection.JavaConversions._
            if (fieldValueMap.nonEmpty) client.hmset(key, fieldValueMap)
            client.close()
          })
        }
      }
      .start()
      .awaitTermination()  // 等待应用程序终止
  }
}
评论

全部评论(0)

回到顶部