Spark - RDD使用详解13(案例实操2:统计Top10热门品类)
作者:hangge | 2023-11-14 08:40
十六、案例实操2:统计 Top10 热门品类
1,数据准备
(1)我们有一个电商网站的用户行为数据文件 user_visit_action.txt,下面是截取里面一部分内容:
- 数据文件中每行数据采用逗号分隔数据
- 每一行数据表示用户的一次行为,这个行为只能是 4 种行为的一种
- 如果搜索关键字为 null,表示数据不是搜索数据
- 如果点击的品类 ID 和产品 ID 为 -1,表示数据不是点击数据
- 针对于下单行为,一次可以下单多个商品,所以品类 ID 和产品 ID 可以是多个,id 之间采用中划线分隔,如果本次不是下单行为,则数据采用 null 表示
- 支付行为和下单行为类似
(3)每行数据的详细字段说明:
| 编号 | 字段名称 | 字段类型 | 字段含义 |
| 1 | date | String | 用户点击行为的日期 |
| 2 | user_id | Long | 用户的 ID |
| 3 | session_id | String | Session 的 ID |
| 4 | page_id | Long | 某个页面的 ID |
| 5 | action_time | String | 动作的时间点 |
| 6 | search_keyword | String | 用户搜索的关键词 |
| 7 | click_category_id | Long | 某一个商品品类的 ID |
| 8 | click_product_id | Long | 某一个商品的 ID |
| 9 | order_category_ids | String | 一次订单中所有品类的 ID 集合 |
| 10 | order_product_ids | String | 一次订单中所有商品的 ID 集合 |
| 11 | pay_category_ids | String | 一次支付中所有品类的 ID 集合 |
| 12 | pay_product_ids | String | 一次支付中所有商品的 ID 集合 |
| 13 | city_id | Long | 城市 ID |
2,需求描述
(1)品类是指产品的分类,大型电商网站品类分多级,咱们的项目中品类只有一级,不同的公司可能对热门的定义不一样。我们按照每个品类的点击、下单、支付的量来统计热门品类。- 鞋 点击数 下单数 支付数
- 衣服 点击数 下单数 支付数
- 电脑 点击数 下单数 支付数
- ......
(2)我们需要统计出 Top10 的热门品类,分别实现如下两种排序方法:
- 先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下单数;下单数再相同,就比较支付数。
- 根据权重计算综合排名进行排序,综合排名 = 点击数 * 20% + 下单数 * 30% + 支付数 * 50%
3,实现代码
(1)下面代码我们先统计出各个品类的点击数、下单数、支付数,然后依次根据点击数、下单数、支付数进行降序排列,最后取出前 10 条数据。
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
// 创建 Spark 上下文环境对象(连接对象)
val sc: SparkContext = new SparkContext(sparkConf)
// 读取用户行为数据文件
val fileRDD = sc.textFile("datas/user_visit_action.txt")
// 将数据转换结构
// 点击的场合 : ( 品类ID,( 1, 0, 0 ) )
// 下单的场合 : ( 品类ID,( 0, 1, 0 ) )
// 支付的场合 : ( 品类ID,( 0, 0, 1 ) )
val flatRDD: RDD[(String, (Int, Int, Int))] = fileRDD.flatMap(
action => {
val datas = action.split(",")
if (datas(6) != "-1") {
// 点击的场合
List((datas(6), (1, 0, 0)))
} else if (datas(8) != "null") {
// 下单的场合
val ids = datas(8).split(",")
ids.map(id => (id, (0, 1, 0)))
} else if (datas(10) != "null") {
// 支付的场合
val ids = datas(10).split(",")
ids.map(id => (id, (0, 0, 1)))
} else {
Nil
}
}
)
// 将相同的品类ID的数据进行分组聚合
// (品类ID,(点击数量, 下单数量, 支付数量))
val analysisRDD = flatRDD.reduceByKey(
(t1, t2) => {
( t1._1+t2._1, t1._2 + t2._2, t1._3 + t2._3 )
}
)
// 按照点击数、下单数、支付数
val sortedCategories = analysisRDD.sortBy(_._2, false)
// 获取Top 10热门品类
val top10Categories = sortedCategories.take(10)
// 输出结果
top10Categories.foreach {
case (category, (clickCount, orderCount, payCount)) =>
println(s"品类: $category, 点击数: $clickCount, 下单数: $orderCount, 支付数: $payCount")
}
//关闭 Spark
sc.stop()
(2)如果想要按照综合排名进行排序,则只需修改如下高亮代码:
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
// 创建 Spark 上下文环境对象(连接对象)
val sc: SparkContext = new SparkContext(sparkConf)
// 读取用户行为数据文件
val fileRDD = sc.textFile("datas/user_visit_action.txt")
// 将数据转换结构
// 点击的场合 : ( 品类ID,( 1, 0, 0 ) )
// 下单的场合 : ( 品类ID,( 0, 1, 0 ) )
// 支付的场合 : ( 品类ID,( 0, 0, 1 ) )
val flatRDD: RDD[(String, (Int, Int, Int))] = fileRDD.flatMap(
action => {
val datas = action.split(",")
if (datas(6) != "-1") {
// 点击的场合
List((datas(6), (1, 0, 0)))
} else if (datas(8) != "null") {
// 下单的场合
val ids = datas(8).split(",")
ids.map(id => (id, (0, 1, 0)))
} else if (datas(10) != "null") {
// 支付的场合
val ids = datas(10).split(",")
ids.map(id => (id, (0, 0, 1)))
} else {
Nil
}
}
)
// 将相同的品类ID的数据进行分组聚合
// (品类ID,(点击数量, 下单数量, 支付数量))
val analysisRDD = flatRDD.reduceByKey(
(t1, t2) => {
( t1._1+t2._1, t1._2 + t2._2, t1._3 + t2._3 )
}
)
// 按照综合排名进行排序
val sortedCategories = analysisRDD.sortBy(categoryCount => {
val (clickCount, orderCount, payCount) = categoryCount._2
clickCount * 0.2 + orderCount * 0.3 + payCount * 0.5
}, ascending = false)
// 获取Top 10热门品类
val top10Categories = sortedCategories.take(10)
// 输出结果
top10Categories.foreach {
case (category, (clickCount, orderCount, payCount)) =>
println(s"品类: $category, 点击数: $clickCount, 下单数: $orderCount, 支付数: $payCount")
}
//关闭 Spark
sc.stop()
附:统计 Top10 热门品类中每个品类的 Top10 活跃 Session
1,需求说明
再完成上面 Top10 热门品类的基础上,我们还需要增加每个品类用户 session 的点击统计,从而得到每个品类的 Top10 活跃 Session。
2,实现代码
(1)下面高亮部分即为针对该需求新增的代码逻辑:
/********** 需求1 ************/
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
// 创建 Spark 上下文环境对象(连接对象)
val sc: SparkContext = new SparkContext(sparkConf)
// 读取用户行为数据文件
val fileRDD = sc.textFile("datas/user_visit_action.txt")
// 缓存数据
fileRDD.cache()
// 将数据转换结构
// 点击的场合 : ( 品类ID,( 1, 0, 0 ) )
// 下单的场合 : ( 品类ID,( 0, 1, 0 ) )
// 支付的场合 : ( 品类ID,( 0, 0, 1 ) )
val flatRDD: RDD[(String, (Int, Int, Int))] = fileRDD.flatMap(
action => {
val datas = action.split(",")
if (datas(6) != "-1") {
// 点击的场合
List((datas(6), (1, 0, 0)))
} else if (datas(8) != "null") {
// 下单的场合
val ids = datas(8).split(",")
ids.map(id => (id, (0, 1, 0)))
} else if (datas(10) != "null") {
// 支付的场合
val ids = datas(10).split(",")
ids.map(id => (id, (0, 0, 1)))
} else {
Nil
}
}
)
// 将相同的品类ID的数据进行分组聚合
// (品类ID,(点击数量, 下单数量, 支付数量))
val analysisRDD = flatRDD.reduceByKey(
(t1, t2) => {
( t1._1+t2._1, t1._2 + t2._2, t1._3 + t2._3 )
}
)
// 按照点击数、下单数、支付数
val sortedCategories = analysisRDD.sortBy(_._2, false)
// 获取Top 10热门品类
val top10Categories = sortedCategories.take(10)
// 输出结果
/**
top10Categories.foreach {
case (category, (clickCount, orderCount, payCount)) =>
println(s"品类: $category, 点击数: $clickCount, 下单数: $orderCount, 支付数: $payCount")
}**/
/********** 需求2 ************/
// 获取top10的品类ID
val top10Ids: Array[String] = top10Categories.map(_._1)
// 过滤原始数据,保留点击和前10品类ID
val filterActionRDD = fileRDD.filter(
action => {
val datas = action.split(",")
if ( datas(6) != "-1" ) {
top10Ids.contains(datas(6))
} else {
false
}
}
)
// 根据品类ID和sessionid进行点击量的统计
// ((品类ID,sessionId),sum)
val reduceRDD: RDD[((String, String), Int)] = filterActionRDD.map(
action => {
val datas = action.split(",")
((datas(6), datas(2)), 1)
}
).reduceByKey(_ + _)
// 将统计的结果进行结构的转换
// ((品类ID,sessionId),sum) => (品类ID, (sessionId, sum))
val mapRDD = reduceRDD.map{
case ( (cid, sid), sum ) => {
( cid, (sid, sum) )
}
}
// 相同的品类进行分组
val groupRDD: RDD[(String, Iterable[(String, Int)])] = mapRDD.groupByKey()
// 将分组后的数据进行点击量的排序,取前10名
val resultRDD = groupRDD.mapValues(
iter => {
iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(10)
}
)
// 打印结果
resultRDD.collect().foreach(println)
//关闭 Spark
sc.stop()
(2)运行结果如下:

全部评论(0)