Spark - RDD使用详解12(案例实操1:统计各省点击量TOP3广告)
作者:hangge | 2023-11-10 08:59
十五、案例实操1:统计各省点击量TOP3广告
1,需求描述
(1)我们有一个记录广告点击的日志文件 agent.log,文件内容如下(部分),其中时间戳,省份,城市,用户,广告,中间字段使用空格分隔。
1516609143871 9 9 79 18 1516609143871 9 0 69 10 1516609143871 9 6 92 4 1516609143871 8 1 41 4 1516609144913 1 3 87 28 1516609144913 8 0 2 5 1516609144913 5 9 46 23 1516609144913 4 0 7 24 1516609144913 9 5 99 11 1516609144913 7 1 36 16 1516609144913 2 3 96 13 1516609144913 3 1 31 10 1516609144913 0 9 20 27
(2)现要求统计出每一个省份每个广告被点击数量排行的 Top3。
2,功能实现
(1)下面是实现代码,具体逻辑如下:
- 读取 agent.log 文件,并创建 RDD(弹性分布式数据集)logData,每一行代表一个记录。
- 对每一行记录进行处理,通过 map 转换操作,提取出省份和广告数据,并将其封装为键值对((省份, 广告), 1)。
- 使用 reduceByKey 对键值对进行聚合,得到每个省份每个广告的点击数量,键为(省份, 广告),值为对应的点击数量。
- 对聚合结果进行结构转换,通过 map 操作,将键值对(省份, (广告, 点击数量))进行重新封装。
- 将结果按省份进行分组,使用 groupByKey 操作,得到每个省份对应的广告点击数据列表,键为省份,值为包含广告和点击数量的列表。
- 对每个省份的广告点击数据列表进行处理,通过 mapValues 操作,对每个列表进行排序、取 Top3 操作,得到每个省份的 Top3 广告点击数据。
object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") // 创建 Spark 上下文环境对象(连接对象) val sc: SparkContext = new SparkContext(sparkConf) // 读取agent.log文件并创建RDD val logData = sc.textFile("datas/agent.log") // 提取省份、广告数据 => ((省份,广告), 1) val extractedData = logData.map( line => { val datas = line.split(" ") ((datas(1), datas(4)), 1) } ) // 统计每个省份每个广告的点击数量 => ((省份,广告), sum) val adClickCounts = extractedData.reduceByKey(_ + _) // 将聚合结果进行结构转换 => (省份, (广告, sum)) val adClickCounts2 = adClickCounts.map { case ((province, ad), count) => (province, (ad, count)) } // 按省份分组 => (省份, [(广告1, sum1),(广告2, sum2)]) val clicksByProvince = adClickCounts2.groupByKey() // 取每个省份的Top3广告点击数量 val top3ClicksByProvince = clicksByProvince.mapValues( iter => { iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(3) } ) // 打印结果 top3ClicksByProvince.collect().foreach(println) //关闭 Spark sc.stop() } }
(2)运行结果如下:
全部评论(0)