返回 导航

Spark

hangge.com

Spark - Spark Streaming使用详解14(案例实操4:最近一小时广告点击量)

作者:hangge | 2023-12-29 10:16

十四、案例实操4:最近一小时广告点击量

1,需求说明

(1)实时统计各个广告最近一小时内各分钟的点击量,结果类似如下:
(1,List((17:02,137), (17:03,242),........ (18:02,36)))
(2,List((17:02,146), (17:03,249),........ (18:02,27)))
(3,List((17:02,171), (17:03,248),........ (18:02,22)))

(2)该需求实现步骤如下:
  • 开窗确定时间范围(1 小时)
  • 在窗口内将数据转换数据结构为 ((adid,hm),count)
  • 按照广告 id 进行分组处理,组内按照时分排序

2,准备工作

(1)首先我们需要一个实时数据生成器,用于不断的生成用户点击广告数据并推送到 Kafka 中,具体可以参考我之前的问题:

(2)接着我们项目需要添加 Kafka 相关依赖:
<!-- streaming-kafka依赖 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.13</artifactId>
    <version>3.4.0</version>
</dependency>

3,编写工具类

由于我们的数据源是 Kafka,这里我们编写一个 MyKafkaUtil 工具类,用于创建一个读取 Kafka 数据的 SparkStreaming
object MyKafkaUtil {
  // kafka 消费者配置
  val kafkaParam = Map(
    "bootstrap.servers" -> "192.168.60.9:9092",
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    //消费者组
    "group.id" -> "commerce-consumer-group",
    //如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
    //可以使用这个配置,latest 自动重置偏移量为最新的偏移量
    "auto.offset.reset" -> "latest",
    //如果是 true,则这个消费者的偏移量会在后台自动提交,但是 kafka 宕机容易丢失数据
    //如果是 false,会需要手动维护 kafka 偏移量
    "enable.auto.commit" -> (true: java.lang.Boolean)
  )

  // 创建 DStream,返回接收到的输入数据
  // LocationStrategies:根据给定的主题和集群地址创建 consumer
  // LocationStrategies.PreferConsistent:持续的在所有 Executor 之间分配分区
  // ConsumerStrategies:选择如何在 Driver 和 Executor 上创建和配置 Kafka Consumer
  // ConsumerStrategies.Subscribe:订阅一系列主题
  def getKafkaStream(topic: String, ssc: StreamingContext):
  InputDStream[ConsumerRecord[String, String]] = {
    val dStream: InputDStream[ConsumerRecord[String, String]] =
      KafkaUtils.createDirectStream[String, String](ssc,
        LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String,
          String](Array(topic), kafkaParam))
    dStream
  }
}

4,编写业务代码

(1)首先我们定义一个使用开窗进行广告点击统计的工具类 getAdHourMintToCount,用于统计最近一小时广告分时点击总数。
object LastHourAdCountHandler {
  //时间格式化对象
  private val sdf: SimpleDateFormat = new SimpleDateFormat("HH:mm")
  /**
   * 统计最近一小时广告分时点击总数
   *
   * @param filterAdsLogDStream 过滤后的数据集
   * @return
   */
  def getAdHourMintToCount(filterAdsLogDStream: DStream[Ads_log]):
  DStream[(String, List[(String, Long)])] = {
    //1.开窗 => 时间间隔为 1 个小时 window()
    val windowAdsLogDStream: DStream[Ads_log] =
      filterAdsLogDStream.window(Minutes(60))

    //2.转换数据结构 ads_log =>((adid,hm),1L) map()
    val adHmToOneDStream: DStream[((String, String), Long)] =
      windowAdsLogDStream.map(adsLog => {
        val timestamp: Long = adsLog.timestamp
        val hm: String = sdf.format(new Date(timestamp))
        ((adsLog.adid, hm), 1L)
      })

    //3.统计总数 ((adid,hm),1L)=>((adid,hm),sum) reduceBykey(_+_)
    val adHmToCountDStream: DStream[((String, String), Long)] =
      adHmToOneDStream.reduceByKey(_ + _)

    //4.转换数据结构 ((adid,hm),sum)=>(adid,(hm,sum)) map()
    val adToHmCountDStream: DStream[(String, (String, Long))] =
      adHmToCountDStream.map { case ((adid, hm), count) =>
        (adid, (hm, count))
      }

    //5.按照 adid 分组 (adid,(hm,sum))=>(adid,Iter[(hm,sum),...]) groupByKey
    adToHmCountDStream.groupByKey()
      .mapValues(iter =>
        iter.toList.sortWith(_._1 < _._1)
      )
  }
}

(2)接着则是主程序代码,它读取 Kafka 中的广告点击日志数据流,对数据进行处理、统计、打印:
object RealTimeApp {
  def main(args: Array[String]): Unit = {
    //1.创建 SparkConf
    val sparkConf: SparkConf = new
        SparkConf().setMaster("local[*]").setAppName("RealTimeApp")

    //2.创建 StreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    //3.读取 Kafka 数据 1583288137305 华南 深圳 4 3
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
      MyKafkaUtil.getKafkaStream("my-topic", ssc)

    //4.将每一行数据转换为样例类对象
    val adsLogDStream: DStream[Ads_log] = kafkaDStream.map(record => {
      //a.取出 value 并按照" "切分
      val arr: Array[String] = record.value().split(" ")
      //b.封装为样例类对象
      Ads_log(arr(0).toLong, arr(1), arr(2), arr(3), arr(4))
    })

    //5.统计最近一小时广告分时点击总数
    val adToHmCountListDStream: DStream[(String, List[(String, Long)])] =
      LastHourAdCountHandler.getAdHourMintToCount(adsLogDStream)

    //6.打印
    adToHmCountListDStream.print()

    //7.开启任务
    ssc.start()
    ssc.awaitTermination()
  }
}

5,运行测试

(1)首先启动我们之前编写的数据生成器程序(点击查看),启动后可以看到程序每隔2秒便会产生一批数据:

(2)接着启动本文编写的广告点击量实时统计程序,可以看到控制台会不断打印出最近一小时的广告点击量:
评论

全部评论(0)

回到顶部