返回 导航

Spark

hangge.com

Spark - RDD使用详解15(案例实操4:TopN主播统计)

作者:hangge | 2024-07-10 08:40

十八、案例实操4:TopN 主播统计

1,需求描述

(1)直播平台中有大区这个概念,一个大区下面包含多个国家,我们需要计算每个大区当天金币收入 TopN 的主播。
(2)主播一天可能会开播多次,所以后期在统计主播当天收入的时候是需要把他当天所有直播中的金币收入都计算在内的。

2,数据准备

(1)video_info.log 为主播的开播记录,里面数据为 json 格式。其中包含主播的 iduid、直播间 idvid 、大区:area、视频开播时长:length、增加粉丝数量:follow 等信息。
{"uid":"8407173251001","vid":"14943445328940001","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":101,"share_num":"21","type":"video_info"}
{"uid":"8407173251002","vid":"14943445328940002","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":201,"share_num":"331","type":"video_info"}
{"uid":"8407173251003","vid":"14943445328940003","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":221,"share_num":"321","type":"video_info"}
{"uid":"8407173251004","vid":"14943445328940004","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":401,"share_num":"311","type":"video_info"}
{"uid":"8407173251005","vid":"14943445328940005","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":31,"share_num":"131","type":"video_info"}
{"uid":"8407173251006","vid":"14943445328940006","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":22,"share_num":"3431","type":"video_info"}
{"uid":"8407173251007","vid":"14943445328940007","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":44,"share_num":"131","type":"video_info"}
{"uid":"8407173251008","vid":"14943445328940008","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":66,"share_num":"131","type":"video_info"}
{"uid":"8407173251009","vid":"14943445328940009","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":32,"share_num":"231","type":"video_info"}
{"uid":"8407173251010","vid":"14943445328940010","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":342,"share_num":"431","type":"video_info"}
{"uid":"8407173251011","vid":"14943445328940011","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":223,"share_num":"331","type":"video_info"}
{"uid":"8407173251012","vid":"14943445328940012","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":554,"share_num":"312","type":"video_info"}
{"uid":"8407173251013","vid":"14943445328940013","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":334,"share_num":"321","type":"video_info"}
{"uid":"8407173251014","vid":"14943445328940014","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":653,"share_num":"311","type":"video_info"}
{"uid":"8407173251015","vid":"14943445328940015","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":322,"share_num":"231","type":"video_info"}
{"uid":"8407173251001","vid":"14943445328940016","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":432,"share_num":"531","type":"video_info"}
{"uid":"8407173251005","vid":"14943445328940017","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":322,"share_num":"231","type":"video_info"}
{"uid":"8407173251008","vid":"14943445328940018","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":564,"share_num":"131","type":"video_info"}
{"uid":"8407173251010","vid":"14943445328940019","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":324,"share_num":"231","type":"video_info"}
{"uid":"8407173251015","vid":"14943445328940020","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":532,"share_num":"331","type":"video_info"}

(2)gift_record.log 为用户送礼记录,里面数据也为 json 格式的。其中包含送礼人 iduid,直播间 idvid,礼物 idgood_id,金币数量:gold 等信息。
{"uid":"7201232141001","vid":"14943445328940001","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141002","vid":"14943445328940001","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141003","vid":"14943445328940002","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141004","vid":"14943445328940002","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141005","vid":"14943445328940003","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141006","vid":"14943445328940003","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141007","vid":"14943445328940004","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141008","vid":"14943445328940004","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141009","vid":"14943445328940005","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141010","vid":"14943445328940005","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141011","vid":"14943445328940006","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141012","vid":"14943445328940006","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141013","vid":"14943445328940007","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141014","vid":"14943445328940007","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141015","vid":"14943445328940008","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141016","vid":"14943445328940008","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141017","vid":"14943445328940009","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141018","vid":"14943445328940009","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141019","vid":"14943445328940010","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141020","vid":"14943445328940010","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141021","vid":"14943445328940011","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141022","vid":"14943445328940011","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141023","vid":"14943445328940012","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141024","vid":"14943445328940012","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141025","vid":"14943445328940013","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141026","vid":"14943445328940013","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141027","vid":"14943445328940014","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141028","vid":"14943445328940014","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141029","vid":"14943445328940015","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141030","vid":"14943445328940015","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141031","vid":"14943445328940016","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141032","vid":"14943445328940016","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141033","vid":"14943445328940017","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141034","vid":"14943445328940017","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141035","vid":"14943445328940018","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141036","vid":"14943445328940018","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141037","vid":"14943445328940019","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141038","vid":"14943445328940019","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141039","vid":"14943445328940020","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141040","vid":"14943445328940020","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}

3,实现思路

(1)首先获取两份数据中的核心字段,使用 fastjson 包解析数据:
  • 主播开播记录(主播 IDuid,直播间 IDvid,大区:area)转换为 (vid,(uid,area)) 格式
  • 用户送礼记录(直播间 IDvid,金币数量:gold)转换为 (vid,gold) 格式

(2)因为用户可能在一次直播中给主播送多次礼物,对用户送礼记录数据进行聚合,对相同 vid 的数据求和。最终得到如下格式数据:
  • (vid,gold_sum)

(3)把这两份数据 join 到一块,vid 作为 join key。最终得到如下格式数据:
  • (vid,((uid,area),gold_sum))

(4)由于一个主播一天可能会开播多次,因此需要基于 uid area 再做一次聚合。首先使用 map 迭代 join 之后的数据,得到如下格式数据:
  • ((uid,area),gold_sum)

(5)然后使用 reduceByKey 算子对数据进行聚合,得到如下格式数据:
  • ((uid,area),gold_sum_all)

(6)因为我们要分区统计 TopN,先使用 map 转换成 (area,(uid,gold_sum_all)) 格式,然后使用 groupByKey 对大区进行分组,得到如下格式数据:
  • area,<(uid,gold_sum_all),(uid,gold_sum_all),(uid,gold_sum_all)>

(7)使用 map 迭代每个分组内的数据,按金币数量倒序排序,取前 N 个,最终输出如下数据格式。这个 TopN 其实就是把前几名主播的 id 还有金币数量拼接成一个字符串。
  • (area,topN)

(8)最后使用 foreach 将结果打印到控制台,多个字段使用制表符分割。
  • area topN

4,实现代码

(1)由于我们需要读取 json 格式的数据,所以项目 pom.xml 文件中除了添加 spark-core 依赖外,还需要添加 fastjson 依赖。
<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.4.1</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.68</version>
    </dependency>
</dependencies>

(2)完整的代码如下:
import com.alibaba.fastjson.JSON
import org.apache.spark.{SparkConf, SparkContext}

object TopNScala {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("TopNScala")
      .setMaster("local")
    val sc = new SparkContext(conf)
    //1:首先获取两份数据中的核心字段,使用fastjson包解析数据
    val videoInfoRDD = sc.textFile("D:\\temp\\video_info.log")
    val giftRecordRDD = sc.textFile("D:\\temp\\gift_record.log")

    //(vid,(uid,area))
    val videoInfoFieldRDD = videoInfoRDD.map(line=>{
      val jsonObj = JSON.parseObject(line)
      val vid = jsonObj.getString("vid")
      val uid = jsonObj.getString("uid")
      val area = jsonObj.getString("area")
      (vid,(uid,area))
    })

    //(vid,gold)
    val giftRecordFieldRDD = giftRecordRDD.map(line=>{
      val jsonObj = JSON.parseObject(line)
      val vid = jsonObj.getString("vid")
      val gold = Integer.parseInt(jsonObj.getString("gold"))
      (vid,gold)
    })

    //2:对用户送礼记录数据进行聚合,对相同vid的数据求和
    //(vid,gold_sum)
    val giftRecordFieldAggRDD = giftRecordFieldRDD.reduceByKey(_ + _)

    //3:把这两份数据join到一块,vid作为join的key
    //(vid,((uid,area),gold_sum))
    val joinRDD = videoInfoFieldRDD.join(giftRecordFieldAggRDD)

    //4:使用map迭代join之后的数据,最后获取到uid、area、gold_sum字段
    //((uid,area),gold_sum)
    val joinMapRDD = joinRDD.map(tup=>{
      //joinRDD: (vid,((uid,area),gold_sum))
      //获取uid
      val uid = tup._2._1._1
      //获取area
      val area = tup._2._1._2
      //获取gold_sum
      val gold_sum = tup._2._2
      ((uid,area),gold_sum)
    })

    //5:使用reduceByKey算子对数据进行聚合
    //((uid,area),gold_sum_all)
    val reduceRDD = joinMapRDD.reduceByKey(_ + _)

    //6:接下来对需要使用groupByKey对数据进行分组,所以先使用map进行转换
    //map:(area,(uid,gold_sum_all))
    //groupByKey: area,<(uid,gold_sum_all),(uid,gold_sum_all),(uid,gold_sum_all)>
    val groupRDD = reduceRDD.map(tup=>(tup._1._2,(tup._1._1,tup._2))).groupByKey()

    //7:使用map迭代每个分组内的数据,按照金币数量倒序排序,取前N个,最终输出area,topN
    //(area,topN)
    val top3RDD = groupRDD.map(tup=>{
      val area = tup._1
      //toList:把iterable转成list
      //sortBy:排序,默认是正序
      //reverse:反转,实现倒序效果
      //take(3):取前3个元素
      //mkString:使用指定字符把集合转成字符串
      //uid:gold_sum_all,uid:gold_sum_all,uid:gold_sum_all
      val top3 = tup._2.toList.sortBy(_._2).reverse.take(3).map(tup=>tup._1+":"+tup._2).mkString(",")
      (area,top3)
    })

    //8:使用foreach将结果打印到控制台,多个字段使用制表符分割
    top3RDD.foreach(tup=>println(tup._1+"\t"+tup._2))

    sc.stop()
  }
}

(3)程序运行结果如下:
评论

全部评论(0)

回到顶部