Spark - SparkSQL使用详解7(案例实操1:各区域热门商品Top3)
作者:hangge | 2023-12-01 09:10
七、案例实操1:各区域热门商品 Top3
1,数据说明
(1)首先 user_visit_action.txt 文件中存放了所有用户的行为记录,下面是截取其中的一部分内容:

- 文件中每行数据的详细字段说明如下:
| 编号 | 字段名称 | 字段类型 | 字段含义 |
| 1 | date | String | 用户点击行为的日期 |
| 2 | user_id | Long | 用户的 ID |
| 3 | session_id | String | Session 的 ID |
| 4 | page_id | Long | 某个页面的 ID |
| 5 | action_time | String | 动作的时间点 |
| 6 | search_keyword | String | 用户搜索的关键词 |
| 7 | click_category_id | Long | 某一个商品品类的 ID |
| 8 | click_product_id | Long | 某一个商品的 ID |
| 9 | order_category_ids | String | 一次订单中所有品类的 ID 集合 |
| 10 | order_product_ids | String | 一次订单中所有商品的 ID 集合 |
| 11 | pay_category_ids | String | 一次支付中所有品类的 ID 集合 |
| 12 | pay_product_ids | String | 一次支付中所有商品的 ID 集合 |
| 13 | city_id | Long | 城市 id |
- 完整数据:
数据.zip
(2)文件 product_info.txt 中则存放的是商品信息,下面是该文件的部分内容:
(3)文件 city_info.txt 中则存放的是城市信息,下面是该文件的部分内容:
(2)运行结果如下说明数据已经准备完毕:

1 商品_1 自营 2 商品_2 自营 3 商品_3 第三方 4 商品_4 自营 5 商品_5 自营
(3)文件 city_info.txt 中则存放的是城市信息,下面是该文件的部分内容:
1 北京 华北 2 上海 华东 3 深圳 华南 4 广州 华南 5 武汉 华中 6 南京 华东 7 天津 华北
2,数据准备
(1)在实现具体需求之前,我们首先需要创建相应的 Hive 表,并将 txt 文件中的数据导入到表中,具体代码如下:
object Hello {
def main(args: Array[String]): Unit = {
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
//创建 SparkSession 对象
val spark: SparkSession = SparkSession.builder().config(sparkConf)
.enableHiveSupport() // 添加Hive支持
.getOrCreate()
// 导入隐式转换
import spark.implicits._
// 创建3张表并导入初始化数据
spark.sql(
"""
|CREATE TABLE `user_visit_action`(
| `date` string,
| `user_id` bigint,
| `session_id` string,
| `page_id` bigint,
| `action_time` string,
| `search_keyword` string,
| `click_category_id` bigint,
| `click_product_id` bigint,
| `order_category_ids` string,
| `order_product_ids` string,
| `pay_category_ids` string,
| `pay_product_ids` string,
| `city_id` bigint)
|row format delimited fields terminated by '\t'
""".stripMargin)
spark.sql(
"""
|load data local inpath 'datas/user_visit_action.txt' into table user_visit_action
""".stripMargin)
spark.sql(
"""
|CREATE TABLE `product_info`(
| `product_id` bigint,
| `product_name` string,
| `extend_info` string)
|row format delimited fields terminated by '\t'
""".stripMargin)
spark.sql(
"""
|load data local inpath 'datas/product_info.txt' into table product_info
""".stripMargin)
spark.sql(
"""
|CREATE TABLE `city_info`(
| `city_id` bigint,
| `city_name` string,
| `area` string)
|row format delimited fields terminated by '\t'
""".stripMargin)
spark.sql(
"""
|load data local inpath 'datas/city_info.txt' into table city_info
""".stripMargin)
// 查看目前所有的Hive表
spark.sql("show tables").show()
// 查看其中 city_info 表的模式信息,包括列名、数据类型和其他元数据信息
spark.sql("DESCRIBE user_visit_action").show()
// 查看 city_info 表的数据
spark.sql("""select * from city_info""").show
//关闭 Spark
spark.stop()
}
}
(2)运行结果如下说明数据已经准备完毕:

(3)由于我使用的是内置的 Hive,可以看到项目根目录下会出现一个 spark-warehouse 文件夹,该文件夹下的的每个子文件夹是一张表,子文件夹名为表名,里面则是表的数据文件。

3,需求说明
(1)我们需要计算各个区域前三大热门商品(这里的热门商品是从点击量的维度来看),并备注上每个商品在主要城市中的分布比例,超过两个城市用其他显示。最终结果类似如下形式:
| 地区 | 商品名称 | 点击次数 | 城市备注 |
| 华北 | 商品 A | 100000 | 北京 21.2%,天津 13.2%,其他 65.6% |
| 华北 | 商品 P | 80200 | 北京 63.0%,太原 10%,其他 27.0% |
| 华北 | 商品 M | 40000 | 北京 63.0%,太原 10%,其他 27.0% |
| 东北 | 商品 J | 92000 | 大连 28%,辽宁 17.0%,其他 55.0% |
(2)要实现该需求,大概思路如下:
- 查询出来所有的点击记录,并与 city_info 表连接,得到每个城市所在的地区,与 Product_info 表连接得到产品名称
- 按照地区和商品 id 分组,统计出每个商品在每个地区的总点击次数
- 每个地区内按照点击次数降序排列
- 只取前三名
- 城市备注需要自定义 UDAF 函数
4,功能实现
(1)首先我们自定义一个聚合函数 CityRemarkUDAF,用于计算城市的点击数量并生成城市备注信息。
// 自定义聚合函数的缓冲区数据类型
case class Buffer( var total : Long, var cityMap:mutable.Map[String, Long] )
// 自定义聚合函数:实现城市备注功能
// 1. 继承Aggregator, 定义泛型
// IN : 城市名称
// BUF : Buffer =>【总点击数量,Map[(city, cnt), (city, cnt)]】
// OUT : 备注信息
// 2. 重写6个方法
class CityRemarkUDAF extends Aggregator[String, Buffer, String]{
// 缓冲区初始化
override def zero: Buffer = {
Buffer(0, mutable.Map[String, Long]())
}
// 更新缓冲区数据
override def reduce(buff: Buffer, city: String): Buffer = {
buff.total += 1
val newCount = buff.cityMap.getOrElse(city, 0L) + 1
buff.cityMap.update(city, newCount)
buff
}
// 合并缓冲区数据
override def merge(buff1: Buffer, buff2: Buffer): Buffer = {
buff1.total += buff2.total
val map1 = buff1.cityMap
val map2 = buff2.cityMap
// 两个Map的合并操作
map2.foreach{
case (city , cnt) => {
val newCount = map1.getOrElse(city, 0L) + cnt
map1.update(city, newCount)
}
}
buff1.cityMap = map1
buff1
}
// 将统计的结果生成字符串信息
override def finish(buff: Buffer): String = {
val remarkList = ListBuffer[String]()
val totalcnt = buff.total
val cityMap = buff.cityMap
// 降序排列
val cityCntList = cityMap.toList.sortWith(
(left, right) => {
left._2 > right._2
}
).take(2)
val hasMore = cityMap.size > 2
var rsum = 0L
cityCntList.foreach{
case ( city, cnt ) => {
val r = cnt * 100 / totalcnt
remarkList.append(s"${city} ${r}%")
rsum += r
}
}
if ( hasMore ) {
remarkList.append(s"其他 ${100 - rsum}%")
}
remarkList.mkString(", ")
}
// Spark 用于序列化缓冲区的编码器
override def bufferEncoder: Encoder[Buffer] = Encoders.product
// Spark 用于序列化输出结果的编码器
override def outputEncoder: Encoder[String] = Encoders.STRING
}
(2)接着我们程序中通过如下步骤进行统计计算:
- 首先将用户行为数据、产品信息、城市信息三张表进行 JOIN,并筛选出 click_product_id > -1 的记录。将查询结果命名为 t1。
- 接着使用用户自定义的 UDAF(CityRemarkUDAF),实现了对区域内商品点击数量的聚合,并生成城市备注信息。将聚合结果命名为 t2。
- 然后使用 Spark SQL 的窗口函数 rank(),在每个区域内对商品的点击数量进行排名。将排名结果命名为 t3。
- 最后从 t3 表中选择排名在前三位的数据,并显示在控制台上。
object Hello {
def main(args: Array[String]): Unit = {
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
//创建 SparkSession 对象
val spark: SparkSession = SparkSession.builder().config(sparkConf)
.enableHiveSupport() // 添加Hive支持
.getOrCreate()
// 查询基本数据
spark.sql(
"""
| select
| a.*,
| p.product_name,
| c.area,
| c.city_name
| from user_visit_action a
| join product_info p on a.click_product_id = p.product_id
| join city_info c on a.city_id = c.city_id
| where a.click_product_id > -1
""".stripMargin).createOrReplaceTempView("t1")
// 根据区域,商品进行数据聚合
spark.udf.register("cityRemark", functions.udaf(new CityRemarkUDAF()))
spark.sql(
"""
| select
| area,
| product_name,
| count(*) as clickCnt,
| cityRemark(city_name) as city_remark
| from t1 group by area, product_name
""".stripMargin).createOrReplaceTempView("t2")
// 区域内对点击数量进行排行
spark.sql(
"""
| select
| *,
| rank() over( partition by area order by clickCnt desc ) as rank
| from t2
""".stripMargin).createOrReplaceTempView("t3")
// 取前3名
spark.sql(
"""
| select
| *
| from t3 where rank <= 3
""".stripMargin).show(false)
//关闭 Spark
spark.stop()
}
}
(3)运行结果如下:
全部评论(0)