Spark - Spark Streaming使用详解13(案例实操3:广告点击量实时统计)
作者:hangge | 2023-12-27 08:50
十三、案例实操3:广告点击量实时统计
1,需求说明
(1)实时统计每天各地区各城市各广告的点击总流量,并将其存入 MySQL。
(2)该需求实现步骤如下:
- 单个批次内对数据进行按照天维度的聚合统计
- 结合 MySQL 数据跟当前批次数据更新原有的数据
2,准备工作
(1)首先我们需要一个实时数据生成器,用于不断的生成用户点击广告数据并推送到 Kafka 中,具体可以参考我之前的问题:
(2)接着我们项目需要添加 Kafka、MySQL 相关依赖:
<!-- streaming-kafka依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.13</artifactId>
<version>3.4.0</version>
</dependency>
<!-- 数据库驱动依赖 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
<!-- 数据库连接池 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.22</version>
</dependency>
(3)同时我们还需要创建 1 张 MySQL 数据表,用于存放每日的广告点击量:
CREATE TABLE area_city_ad_count ( dt VARCHAR(20), area VARCHAR(20), city VARCHAR(20), adid VARCHAR(20), count BIGINT, PRIMARY KEY (dt,area,city,adid) );
3,编写工具类
(1)由于我们的数据源是 Kafka,这里我们编写一个 MyKafkaUtil 工具类,用于创建一个 读取 Kafka 数据的 SparkStreaming:
object MyKafkaUtil {
// kafka 消费者配置
val kafkaParam = Map(
"bootstrap.servers" -> "192.168.60.9:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
//消费者组
"group.id" -> "commerce-consumer-group",
//如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
//可以使用这个配置,latest 自动重置偏移量为最新的偏移量
"auto.offset.reset" -> "latest",
//如果是 true,则这个消费者的偏移量会在后台自动提交,但是 kafka 宕机容易丢失数据
//如果是 false,会需要手动维护 kafka 偏移量
"enable.auto.commit" -> (true: java.lang.Boolean)
)
// 创建 DStream,返回接收到的输入数据
// LocationStrategies:根据给定的主题和集群地址创建 consumer
// LocationStrategies.PreferConsistent:持续的在所有 Executor 之间分配分区
// ConsumerStrategies:选择如何在 Driver 和 Executor 上创建和配置 Kafka Consumer
// ConsumerStrategies.Subscribe:订阅一系列主题
def getKafkaStream(topic: String, ssc: StreamingContext):
InputDStream[ConsumerRecord[String, String]] = {
val dStream: InputDStream[ConsumerRecord[String, String]] =
KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String,
String](Array(topic), kafkaParam))
dStream
}
}
(2)同时我们编写一个 JdbcUtil 工具类,方便我们对 MySQL 表数据进行增删改查操作:
object JdbcUtil {
//初始化连接池
var dataSource: DataSource = init()
//初始化连接池方法
def init(): DataSource = {
val properties = new Properties()
properties.setProperty("driverClassName", "com.mysql.jdbc.Driver")
properties.setProperty("url", "jdbc:mysql://192.168.60.1:3306/hangge")
properties.setProperty("username", "root")
properties.setProperty("password", "hangge1234")
DruidDataSourceFactory.createDataSource(properties)
}
//获取 MySQL 连接
def getConnection: Connection = {
dataSource.getConnection
}
//执行 SQL 语句,单条数据插入
def executeUpdate(connection: Connection, sql: String, params: Array[Any]): Int
= {
var rtn = 0
var pstmt: PreparedStatement = null
try {
connection.setAutoCommit(false)
pstmt = connection.prepareStatement(sql)
if (params != null && params.length > 0) {
for (i <- params.indices) {
pstmt.setObject(i + 1, params(i))
}
}
rtn = pstmt.executeUpdate()
connection.commit()
pstmt.close()
} catch {
case e: Exception => e.printStackTrace()
}
rtn
}
//执行 SQL 语句,批量数据插入
def executeBatchUpdate(connection: Connection, sql: String, paramsList:
Iterable[Array[Any]]): Array[Int] = {
var rtn: Array[Int] = null
var pstmt: PreparedStatement = null
try {
connection.setAutoCommit(false)
pstmt = connection.prepareStatement(sql)
for (params <- paramsList) {
if (params != null && params.length > 0) {
for (i <- params.indices) {
pstmt.setObject(i + 1, params(i))
}
pstmt.addBatch()
}
}
rtn = pstmt.executeBatch()
connection.commit()
pstmt.close()
} catch {
case e: Exception => e.printStackTrace()
}
rtn
}
//判断一条数据是否存在
def isExist(connection: Connection, sql: String, params: Array[Any]): Boolean =
{
var flag: Boolean = false
var pstmt: PreparedStatement = null
try {
pstmt = connection.prepareStatement(sql)
for (i <- params.indices) {
pstmt.setObject(i + 1, params(i))
}
flag = pstmt.executeQuery().next()
pstmt.close()
} catch {
case e: Exception => e.printStackTrace()
}
flag
}
//获取 MySQL 的一条数据
def getDataFromMysql(connection: Connection, sql: String, params: Array[Any]):
Long = {
var result: Long = 0L
var pstmt: PreparedStatement = null
try {
pstmt = connection.prepareStatement(sql)
for (i <- params.indices) {
pstmt.setObject(i + 1, params(i))
}
val resultSet: ResultSet = pstmt.executeQuery()
while (resultSet.next()) {
result = resultSet.getLong(1)
}
resultSet.close()
pstmt.close()
} catch {
case e: Exception => e.printStackTrace()
}
result
}
//主方法,用于测试上述方法
def main(args: Array[String]): Unit = {
}
}
4,编写业务代码
(1)首先我们定义一个用于处理广告点击数据的工具类 DateAreaCityAdCountHandler,主要涉及两个功能:统计每天各大区各个城市广告点击总数并保存至 MySQL 中。
object DateAreaCityAdCountHandler {
//时间格式化对象
private val sdf: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
/**
* 统计每天各大区各个城市广告点击总数并保存至 MySQL 中
* @param filterAdsLogDStream 根据黑名单过滤后的数据集
*/
def saveDateAreaCityAdCountToMysql(filterAdsLogDStream: DStream[Ads_log]): Unit = {
//1.统计每天各大区各个城市广告点击总数
val dateAreaCityAdToCount: DStream[((String, String, String, String), Long)] =
filterAdsLogDStream.map(ads_log => {
//a.取出时间戳
val timestamp: Long = ads_log.timestamp
//b.格式化为日期字符串
val dt: String = sdf.format(new Date(timestamp))
//c.组合,返回
((dt, ads_log.area, ads_log.city, ads_log.adid), 1L)
}).reduceByKey(_ + _)
//2.将单个批次统计之后的数据集合 MySQL 数据对原有的数据更新
dateAreaCityAdToCount.foreachRDD(rdd => {
//对每个分区单独处理
rdd.foreachPartition(iter => {
//a.获取连接
val connection: Connection = JdbcUtil.getConnection
//b.写库
iter.foreach { case ((dt, area, city, adid), count) =>
JdbcUtil.executeUpdate(connection,
"""
|INSERT INTO area_city_ad_count (dt,area,city,adid,count)
|VALUES(?,?,?,?,?)
|ON DUPLICATE KEY
|UPDATE count=count+?;
""".stripMargin,
Array(dt, area, city, adid, count, count))
}
//c.释放连接
connection.close()
})
})
}
}
(2)接着则是主程序代码,它读取 Kafka 中的广告点击日志数据流,对数据进行处理、统计、入库:
object RealTimeApp {
def main(args: Array[String]): Unit = {
//1.创建 SparkConf
val sparkConf: SparkConf = new
SparkConf().setMaster("local[*]").setAppName("RealTimeApp")
//2.创建 StreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(3))
//3.读取 Kafka 数据 1583288137305 华南 深圳 4 3
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
MyKafkaUtil.getKafkaStream("my-topic", ssc)
//4.将每一行数据转换为样例类对象
val adsLogDStream: DStream[Ads_log] = kafkaDStream.map(record => {
//a.取出 value 并按照" "切分
val arr: Array[String] = record.value().split(" ")
//b.封装为样例类对象
Ads_log(arr(0).toLong, arr(1), arr(2), arr(3), arr(4))
})
//5.统计每天各大区各个城市广告点击总数并保存至 MySQL 中
DateAreaCityAdCountHandler.saveDateAreaCityAdCountToMysql(adsLogDStream)
//6.开启任务
ssc.start()
ssc.awaitTermination()
}
}
5,运行测试
(1)首先启动我们之前编写的数据生成器程序(点击查看),启动后可以看到程序每隔 2 秒便会产生一批数据:

(2)接着启动本文编写的广告点击量实时统计程序,查看 area_city_ad_count 表可以看到统计数据在不断的更新:
全部评论(0)