返回 导航

Spark

hangge.com

Spark - SparkSQL使用详解8(案例实操2:TopN主播统计)

作者:hangge | 2024-07-15 08:37
    我之前写过文章演示了如何使用 Spark RDD transformation 算子去计算每个大区当天金币收入 TopN 的主播(点击查看),但是其实现起来比较麻烦的,代码量相对来说比较多。本文我将演示如何使用 Spark SQL 来实现同样的功能,会发现使用 Spark SQL 会简单许多。

八、案例实操2:TopN 主播统计

1,需求描述

(1)直播平台中有大区这个概念,一个大区下面包含多个国家,我们需要计算每个大区当天金币收入 TopN 的主播。
(2)主播一天可能会开播多次,所以后期在统计主播当天收入的时候是需要把他当天所有直播中的金币收入都计算在内的。

2,数据准备

(1)video_info.log 为主播的开播记录,里面数据为 json 格式。其中包含主播的 iduid、直播间 idvid 、大区:area、视频开播时长:length、增加粉丝数量:follow 等信息。
{"uid":"8407173251001","vid":"14943445328940001","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":101,"share_num":"21","type":"video_info"}
{"uid":"8407173251002","vid":"14943445328940002","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":201,"share_num":"331","type":"video_info"}
{"uid":"8407173251003","vid":"14943445328940003","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":221,"share_num":"321","type":"video_info"}
{"uid":"8407173251004","vid":"14943445328940004","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":401,"share_num":"311","type":"video_info"}
{"uid":"8407173251005","vid":"14943445328940005","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":31,"share_num":"131","type":"video_info"}
{"uid":"8407173251006","vid":"14943445328940006","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":22,"share_num":"3431","type":"video_info"}
{"uid":"8407173251007","vid":"14943445328940007","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":44,"share_num":"131","type":"video_info"}
{"uid":"8407173251008","vid":"14943445328940008","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":66,"share_num":"131","type":"video_info"}
{"uid":"8407173251009","vid":"14943445328940009","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":32,"share_num":"231","type":"video_info"}
{"uid":"8407173251010","vid":"14943445328940010","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":342,"share_num":"431","type":"video_info"}
{"uid":"8407173251011","vid":"14943445328940011","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":223,"share_num":"331","type":"video_info"}
{"uid":"8407173251012","vid":"14943445328940012","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":554,"share_num":"312","type":"video_info"}
{"uid":"8407173251013","vid":"14943445328940013","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":334,"share_num":"321","type":"video_info"}
{"uid":"8407173251014","vid":"14943445328940014","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":653,"share_num":"311","type":"video_info"}
{"uid":"8407173251015","vid":"14943445328940015","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":322,"share_num":"231","type":"video_info"}
{"uid":"8407173251001","vid":"14943445328940016","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":432,"share_num":"531","type":"video_info"}
{"uid":"8407173251005","vid":"14943445328940017","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":322,"share_num":"231","type":"video_info"}
{"uid":"8407173251008","vid":"14943445328940018","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":564,"share_num":"131","type":"video_info"}
{"uid":"8407173251010","vid":"14943445328940019","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":324,"share_num":"231","type":"video_info"}
{"uid":"8407173251015","vid":"14943445328940020","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":532,"share_num":"331","type":"video_info"}

(2)gift_record.log 为用户送礼记录,里面数据也为 json 格式的。其中包含送礼人 iduid,直播间 idvid,礼物 idgood_id,金币数量:gold 等信息。
{"uid":"7201232141001","vid":"14943445328940001","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141002","vid":"14943445328940001","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141003","vid":"14943445328940002","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141004","vid":"14943445328940002","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141005","vid":"14943445328940003","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141006","vid":"14943445328940003","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141007","vid":"14943445328940004","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141008","vid":"14943445328940004","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141009","vid":"14943445328940005","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141010","vid":"14943445328940005","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141011","vid":"14943445328940006","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141012","vid":"14943445328940006","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141013","vid":"14943445328940007","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141014","vid":"14943445328940007","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141015","vid":"14943445328940008","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141016","vid":"14943445328940008","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141017","vid":"14943445328940009","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141018","vid":"14943445328940009","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141019","vid":"14943445328940010","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141020","vid":"14943445328940010","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141021","vid":"14943445328940011","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141022","vid":"14943445328940011","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141023","vid":"14943445328940012","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141024","vid":"14943445328940012","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141025","vid":"14943445328940013","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141026","vid":"14943445328940013","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141027","vid":"14943445328940014","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141028","vid":"14943445328940014","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141029","vid":"14943445328940015","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141030","vid":"14943445328940015","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141031","vid":"14943445328940016","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141032","vid":"14943445328940016","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141033","vid":"14943445328940017","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141034","vid":"14943445328940017","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141035","vid":"14943445328940018","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141036","vid":"14943445328940018","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141037","vid":"14943445328940019","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141038","vid":"14943445328940019","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141039","vid":"14943445328940020","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141040","vid":"14943445328940020","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}

3,实现思路

(1)直接使用 SparkSession 中的 load 方式加载 json 的数据:
val videoInfoDf = sparkSession.read.json("D:\\temp\\video_info.log")
val giftRecordDf = sparkSession.read.json("D:\\temp\\gift_record.log")

(2)对这两份数据注册临时表:
videoInfoDf.createOrReplaceTempView("video_info")
giftRecordDf.createOrReplaceTempView("gift_record")

(3)执行 sql 计算 TopN 主播,具体内容如下:
  • t1:通过连接 video_info gift_record 表,获取每个直播间的主播(uid)、直播间 ID(vid)、大区(area)以及该直播间收到的总金币数(gold_sum)。
  • t2:在 t1 的基础上,按主播(uid)进行分组,计算每个主播的总金币数(gold_sum_all),并且获取每个主播所属的大区(area)。
  • t3:在 t2 的基础上,使用 row_number() 窗口函数按大区(area)分区,并按主播的总金币数(gold_sum_all)进行降序排序,为每个主播在其大区内编号(num)。
  • t4:在 t3 的基础上,筛选出每个大区中金币数最多的前 3 名主播,并将每个主播的 uid 和其总金币数(gold_sum_all)拼接成字符串(topn)。
select
	t4.area,
	concat_ws(',',collect_list(t4.topn)) as topn_list
from(
	select 
		t3.area,concat(t3.uid,':',cast(t3.gold_sum_all as int)) as topn
	from(
		select
			t2.uid,t2.area,t2.gold_sum_all,row_number() 
			  over (partition by area order by gold_sum_all desc) as num
		from(
			select
				t1.uid,max(t1.area) as area,sum(t1.gold_sum) as gold_sum_all
			from(
				select
					vi.uid,vi.vid,vi.area,gr.gold_sum
				from
					video_info as vi
				join
				(select
					vid,sum(gold) as gold_sum
				from
					gift_record
				group by vid
				)as gr
				on vi.vid = gr.vid
			) as t1
			group by t1.uid
		) as t2
	)as t3
	where t3.num <=3
) as t4
group by t4.area

(4)使用 foreach 将结果打印到控制台,最终结果类似下面这样:
CN	8407173251008:120,8407173251003:60,8407173251014:50
ID	8407173251005:160,8407173251010:140,8407173251002:70
US	8407173251015:180,8407173251012:70,8407173251001:60

4,实现代码

(1)由于我们需要读取 json 格式的数据,所以项目 pom.xml 文件中除了添加 spark-corespark-sql 依赖外,还需要添加 fastjson 依赖。
<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.4.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>3.4.1</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.68</version>
    </dependency>
</dependencies>

(2)完整的代码如下:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object TopNAnchorScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")

    //创建SparkSession对象,里面包含SparkContext和SqlContext
    val sparkSession = SparkSession.builder()
      .appName("LoadAndSaveOpScala")
      .config(conf)
      .getOrCreate()

    //1:直接使用sparkSession中的load方式加载json数据
    val videoInfoDf = sparkSession.read.json("D:\\temp\\video_info.log")
    val giftRecordDf = sparkSession.read.json("D:\\temp\\gift_record.log")

    //2:对这两份数据注册临时表
    videoInfoDf.createOrReplaceTempView("video_info")
    giftRecordDf.createOrReplaceTempView("gift_record")

    //3:执行sql计算TopN主播
    val sql = "select " +
      "    t4.area, " +
      "    concat_ws(',',collect_list(t4.topn)) as topn_list " +
      "from( " +
      "    select " +
      "        t3.area,concat(t3.uid,':',cast(t3.gold_sum_all as int)) as topn " +
      "    from( " +
      "        select " +
      "            t2.uid,t2.area,t2.gold_sum_all,row_number() " +
      "              over (partition by area order by gold_sum_all desc) as num " +
      "        from( " +
      "            select " +
      "                t1.uid,max(t1.area) as area,sum(t1.gold_sum) as gold_sum_all " +
      "            from( " +
      "                select " +
      "                    vi.uid,vi.vid,vi.area,gr.gold_sum " +
      "                from " +
      "                    video_info as vi " +
      "                join " +
      "                (select " +
      "                    vid,sum(gold) as gold_sum " +
      "                from " +
      "                    gift_record " +
      "                group by vid " +
      "                )as gr " +
      "                on vi.vid = gr.vid " +
      "            ) as t1 " +
      "            group by t1.uid " +
      "        ) as t2 " +
      "    )as t3 " +
      "    where t3.num <=3 " +
      ") as t4 " +
      "group by t4.area";

    val resDf = sparkSession.sql(sql)

    //4:使用foreach将结果打印到控制台
    resDf.rdd.foreach(row=>println(row.getAs[String]("area")+"\t"+row.getAs[String]("topn_list")))

    sparkSession.stop()
  }

}

(3)程序运行结果如下:

附:将结果保存到 hdfs 上面

(1)上面的样例我们将统计的结果直接打印到控制台,实际应用中我们通常需要将结果保存到 HDFS 上,为此我们只需要修改最后输出部分的代码即可:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object TopNAnchorScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")

    //创建SparkSession对象,里面包含SparkContext和SqlContext
    val sparkSession = SparkSession.builder()
      .appName("LoadAndSaveOpScala")
      .config(conf)
      .getOrCreate()

    //1:直接使用sparkSession中的load方式加载json数据
    val videoInfoDf = sparkSession.read.json("D:\\temp\\video_info.log")
    val giftRecordDf = sparkSession.read.json("D:\\temp\\gift_record.log")

    //2:对这两份数据注册临时表
    videoInfoDf.createOrReplaceTempView("video_info")
    giftRecordDf.createOrReplaceTempView("gift_record")

    //3:执行sql计算TopN主播
    val sql = "select " +
      "    t4.area, " +
      "    concat_ws(',',collect_list(t4.topn)) as topn_list " +
      "from( " +
      "    select " +
      "        t3.area,concat(t3.uid,':',cast(t3.gold_sum_all as int)) as topn " +
      "    from( " +
      "        select " +
      "            t2.uid,t2.area,t2.gold_sum_all,row_number() " +
      "              over (partition by area order by gold_sum_all desc) as num " +
      "        from( " +
      "            select " +
      "                t1.uid,max(t1.area) as area,sum(t1.gold_sum) as gold_sum_all " +
      "            from( " +
      "                select " +
      "                    vi.uid,vi.vid,vi.area,gr.gold_sum " +
      "                from " +
      "                    video_info as vi " +
      "                join " +
      "                (select " +
      "                    vid,sum(gold) as gold_sum " +
      "                from " +
      "                    gift_record " +
      "                group by vid " +
      "                )as gr " +
      "                on vi.vid = gr.vid " +
      "            ) as t1 " +
      "            group by t1.uid " +
      "        ) as t2 " +
      "    )as t3 " +
      "    where t3.num <=3 " +
      ") as t4 " +
      "group by t4.area";

    val resDf = sparkSession.sql(sql)

    //4:将结果保存到hdfs上
    resDf.rdd
      .map(row => row.getAs[String]("area") + "\t" + row.getAs[String]("topn_list"))
      .saveAsTextFile("hdfs://node1:9000/out-topn")

    sparkSession.stop()
  }

}

(2)任务执行完毕后查看结果:
hdfs dfs -cat /out-topn/*
评论

全部评论(0)

回到顶部