返回 导航

Spark

hangge.com

Spark - RDD使用详解12(案例实操1:统计各省点击量TOP3广告)

作者:hangge | 2023-11-10 08:59

十五、案例实操1:统计各省点击量TOP3广告

1,需求描述

(1)我们有一个记录广告点击的日志文件 agent.log,文件内容如下(部分),其中时间戳,省份,城市,用户,广告,中间字段使用空格分隔。
1516609143871 9 9 79 18
1516609143871 9 0 69 10
1516609143871 9 6 92 4
1516609143871 8 1 41 4
1516609144913 1 3 87 28
1516609144913 8 0 2 5
1516609144913 5 9 46 23
1516609144913 4 0 7 24
1516609144913 9 5 99 11
1516609144913 7 1 36 16
1516609144913 2 3 96 13
1516609144913 3 1 31 10
1516609144913 0 9 20 27

(2)现要求统计出每一个省份每个广告被点击数量排行的 Top3

2,功能实现

(1)下面是实现代码,具体逻辑如下:
  • 读取 agent.log 文件,并创建 RDD(弹性分布式数据集)logData,每一行代表一个记录。
  • 对每一行记录进行处理,通过 map 转换操作,提取出省份和广告数据,并将其封装为键值对((省份, 广告), 1)。
  • 使用 reduceByKey 对键值对进行聚合,得到每个省份每个广告的点击数量,键为(省份, 广告),值为对应的点击数量。
  • 对聚合结果进行结构转换,通过 map 操作,将键值对(省份, (广告, 点击数量))进行重新封装。
  • 将结果按省份进行分组,使用 groupByKey 操作,得到每个省份对应的广告点击数据列表,键为省份,值为包含广告和点击数量的列表。
  • 对每个省份的广告点击数据列表进行处理,通过 mapValues 操作,对每个列表进行排序、取 Top3 操作,得到每个省份的 Top3 广告点击数据。
object Hello {
  def main(args: Array[String]): Unit = {

    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
    // 创建 Spark 上下文环境对象(连接对象)
    val sc: SparkContext = new SparkContext(sparkConf)

    // 读取agent.log文件并创建RDD
    val logData = sc.textFile("datas/agent.log")

    // 提取省份、广告数据 => ((省份,广告), 1)
    val extractedData = logData.map(
      line => {
        val datas = line.split(" ")
        ((datas(1), datas(4)), 1)
      }
    )

    // 统计每个省份每个广告的点击数量 => ((省份,广告), sum)
    val adClickCounts = extractedData.reduceByKey(_ + _)

    // 将聚合结果进行结构转换 => (省份, (广告, sum))
    val adClickCounts2 = adClickCounts.map {
      case ((province, ad), count) => (province, (ad, count))
    }

    // 按省份分组 => (省份, [(广告1, sum1),(广告2, sum2)])
    val clicksByProvince = adClickCounts2.groupByKey()

    // 取每个省份的Top3广告点击数量
    val top3ClicksByProvince = clicksByProvince.mapValues(
      iter => {
        iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)
      }
    )

    // 打印结果
    top3ClicksByProvince.collect().foreach(println)

    //关闭 Spark
    sc.stop()
  }
}

(2)运行结果如下:
评论

全部评论(0)

回到顶部