返回 导航

Spark

hangge.com

Spark - Spark Streaming使用详解12(案例实操2:广告黑名单)

作者:hangge | 2023-12-25 08:44

十二、案例实操2:广告黑名单

1,需求说明

(1)我们需要实现实时的动态黑名单机制,即将每天对某个广告点击超过 100 次的用户拉黑(黑名单保存到 MySQL 中)
(2)该需求实现步骤如下:
  • 读取 Kafka 数据之后,对 MySQL 中存储的黑名单数据做校验。
  • 校验通过则对给用户点击广告次数累加 1 并存入 MySQL
  • 存入 MySQL 之后再对数据做校验,如果单日超过 100 次则将该用户加入黑名单。

2,准备工作

(1)首先我们需要一个实时数据生成器,用于不断的生成用户点击广告数据并推送到 Kafka 中,具体可以参考我之前的文章:

(2)接着我们项目需要添加 KafkaMySQL 相关依赖:
<!-- streaming-kafka依赖 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.13</artifactId>
    <version>3.4.0</version>
</dependency>
<!-- 数据库驱动依赖 -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.33</version>
</dependency>
<!-- 数据库连接池 -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid-spring-boot-starter</artifactId>
    <version>1.1.22</version>
</dependency>

(3)同时我们还需要创建两张 MySQL 数据表,一张是用于存放黑名单用户的表:
CREATE TABLE black_list (userid CHAR(1) PRIMARY KEY);

(4)另一张用于存放单日各用户点击每个广告的次数:
CREATE TABLE user_ad_count (
  dt varchar(255),
  userid CHAR (1),
  adid CHAR (1),
  count BIGINT,
  PRIMARY KEY (dt, userid, adid)
);

3,编写工具类

(1)由于我们的数据源是 Kafka,这里我们编写一个 MyKafkaUtil 工具类,用于创建一个读取 Kafka 数据的 SparkStreaming
object MyKafkaUtil {
  // kafka 消费者配置
  val kafkaParam = Map(
    "bootstrap.servers" -> "192.168.60.9:9092",
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    //消费者组
    "group.id" -> "commerce-consumer-group",
    //如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
    //可以使用这个配置,latest 自动重置偏移量为最新的偏移量
    "auto.offset.reset" -> "latest",
    //如果是 true,则这个消费者的偏移量会在后台自动提交,但是 kafka 宕机容易丢失数据
    //如果是 false,会需要手动维护 kafka 偏移量
    "enable.auto.commit" -> (true: java.lang.Boolean)
  )

  // 创建 DStream,返回接收到的输入数据
  // LocationStrategies:根据给定的主题和集群地址创建 consumer
  // LocationStrategies.PreferConsistent:持续的在所有 Executor 之间分配分区
  // ConsumerStrategies:选择如何在 Driver 和 Executor 上创建和配置 Kafka Consumer
  // ConsumerStrategies.Subscribe:订阅一系列主题
  def getKafkaStream(topic: String, ssc: StreamingContext):
  InputDStream[ConsumerRecord[String, String]] = {
    val dStream: InputDStream[ConsumerRecord[String, String]] =
      KafkaUtils.createDirectStream[String, String](ssc,
        LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String,
          String](Array(topic), kafkaParam))
    dStream
  }
}

(2)同时我们编写一个 JdbcUtil 工具类,方便我们对 MySQL 表数据进行增删改查操作:
object JdbcUtil {
  //初始化连接池
  var dataSource: DataSource = init()

  //初始化连接池方法
  def init(): DataSource = {
    val properties = new Properties()
    properties.setProperty("driverClassName", "com.mysql.jdbc.Driver")
    properties.setProperty("url", "jdbc:mysql://192.168.60.1:3306/hangge")
    properties.setProperty("username", "root")
    properties.setProperty("password", "hangge1234")
    DruidDataSourceFactory.createDataSource(properties)
  }

  //获取 MySQL 连接
  def getConnection: Connection = {
    dataSource.getConnection
  }

  //执行 SQL 语句,单条数据插入
  def executeUpdate(connection: Connection, sql: String, params: Array[Any]): Int
  = {
    var rtn = 0
    var pstmt: PreparedStatement = null
    try {
      connection.setAutoCommit(false)
      pstmt = connection.prepareStatement(sql)
      if (params != null && params.length > 0) {
        for (i <- params.indices) {
          pstmt.setObject(i + 1, params(i))
        }
      }
      rtn = pstmt.executeUpdate()
      connection.commit()
      pstmt.close()
    } catch {
      case e: Exception => e.printStackTrace()
    }
    rtn
  }

  //执行 SQL 语句,批量数据插入
  def executeBatchUpdate(connection: Connection, sql: String, paramsList:
  Iterable[Array[Any]]): Array[Int] = {
    var rtn: Array[Int] = null
    var pstmt: PreparedStatement = null
    try {
      connection.setAutoCommit(false)
      pstmt = connection.prepareStatement(sql)
      for (params <- paramsList) {
        if (params != null && params.length > 0) {
          for (i <- params.indices) {
            pstmt.setObject(i + 1, params(i))
          }
          pstmt.addBatch()
        }
      }

      rtn = pstmt.executeBatch()
      connection.commit()
      pstmt.close()
    } catch {
      case e: Exception => e.printStackTrace()
    }
    rtn
  }

  //判断一条数据是否存在
  def isExist(connection: Connection, sql: String, params: Array[Any]): Boolean =
  {
    var flag: Boolean = false
    var pstmt: PreparedStatement = null
    try {
      pstmt = connection.prepareStatement(sql)
      for (i <- params.indices) {
        pstmt.setObject(i + 1, params(i))
      }
      flag = pstmt.executeQuery().next()
      pstmt.close()
    } catch {
      case e: Exception => e.printStackTrace()
    }
    flag
  }

  //获取 MySQL 的一条数据
  def getDataFromMysql(connection: Connection, sql: String, params: Array[Any]):
  Long = {
    var result: Long = 0L
    var pstmt: PreparedStatement = null
    try {
      pstmt = connection.prepareStatement(sql)
      for (i <- params.indices) {
        pstmt.setObject(i + 1, params(i))
      }
      val resultSet: ResultSet = pstmt.executeQuery()
      while (resultSet.next()) {
        result = resultSet.getLong(1)
      }
      resultSet.close()
      pstmt.close()
    } catch {
      case e: Exception => e.printStackTrace()
    }
    result
  }
  
  //主方法,用于测试上述方法
  def main(args: Array[String]): Unit = {
  }
}

4,编写业务代码

(1)首先我们定义一个广告点击记录样例类 Ads_log,由于表示每次广告点击的时间、地区、城市、用户 ID、广告 ID
case class Ads_log(timestamp: Long, area: String, city: String, userid: String, adid: String)

(2)接着我们定义一个用于处理广告点击数据的工具类 BlackListHandler,主要涉及两个功能:添加黑名单和过滤黑名单。
object BlackListHandler {
  // 时间格式化对象
  private val sdf = new SimpleDateFormat("yyyy-MM-dd")

  // 添加黑名单
  def addBlackList(filterAdsLogDSteam: DStream[Ads_log]): Unit = {

    // 1.统计当前批次中单日每个用户点击每个广告的总次数
    // 将数据接转换结构 ads_log=>((date,user,adid),1)
    val dateUserAdToOne: DStream[((String, String, String), Long)] =
    filterAdsLogDSteam.map(adsLog => {
      //a.将时间戳转换为日期字符串
      val date: String = sdf.format(new Date(adsLog.timestamp))
      //b.返回值
      ((date, adsLog.userid, adsLog.adid), 1L)
    })

    // 2.统计单日每个用户点击每个广告的总次数
    // 将数据接转换结构 ((date,user,adid),1)=>((date,user,adid),count)
    val dateUserAdToCount: DStream[((String, String, String), Long)] =
      dateUserAdToOne.reduceByKey(_ + _)
    dateUserAdToCount.foreachRDD(rdd => {
      rdd.foreachPartition(iter => {
        val connection: Connection = JdbcUtil.getConnection
        iter.foreach { case ((dt, user, ad), count) =>
          // 将该用户对该广告的点击次数进行累加
          JdbcUtil.executeUpdate(connection,
            """
              |INSERT INTO user_ad_count (dt,userid,adid,count)
              |VALUES (?,?,?,?)
              |ON DUPLICATE KEY
              |UPDATE count=count+?
            """.stripMargin, Array(dt, user, ad, count, count))

          // 获取该用户点击该广告的总次数
          val ct: Long = JdbcUtil.getDataFromMysql(connection,
            "select count from user_ad_count where dt=? and userid=? and adid =?",
            Array(dt, user, ad))

          // 如果点击次数超过等于 30 次,就将该用户加入到黑名单表中
          if (ct >= 30) {
            JdbcUtil.executeUpdate(connection,
              "INSERT INTO black_list (userid) VALUES (?) ON DUPLICATE KEY update userid=?",
              Array(user, user))
          }
        }
        connection.close()
      })
    })
  }

  // 过滤黑名单
  def filterByBlackList(adsLogDStream: DStream[Ads_log]): DStream[Ads_log] = {
    // 它将每条广告点击日志的用户ID与黑名单表进行对比
    adsLogDStream.transform(rdd => {
      // 如果用户ID存在于黑名单中,说明该用户被标记为黑名单用户,该条广告点击日志将被过滤掉,不会继续处理
      rdd.filter(adsLog => {
        val connection: Connection = JdbcUtil.getConnection
        val bool: Boolean = JdbcUtil.isExist(connection,
          "select * from black_list where userid=?", Array(adsLog.userid))
        connection.close()
        !bool
      })
    })
  }
}

(3)最后则是主程序代码,它读取 Kafka 中的广告点击日志数据流,对数据进行过滤、统计和黑名单处理:
object RealTimeApp {
  def main(args: Array[String]): Unit = {
    //1.创建 SparkConf
    val sparkConf: SparkConf = new
        SparkConf().setMaster("local[*]").setAppName("RealTimeApp")

    //2.创建 StreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    //3.读取 Kafka 数据 1583288137305 华南 深圳 4 3
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
      MyKafkaUtil.getKafkaStream("my-topic", ssc)

    //4.将每一行数据转换为样例类对象
    val adsLogDStream: DStream[Ads_log] = kafkaDStream.map(record => {
      //a.取出 value 并按照" "切分
      val arr: Array[String] = record.value().split(" ")
      //b.封装为样例类对象
      Ads_log(arr(0).toLong, arr(1), arr(2), arr(3), arr(4))
    })

    //5.根据 MySQL 中的黑名单过滤当前数据集
    val filterAdsLogDStream: DStream[Ads_log] =
      BlackListHandler.filterByBlackList(adsLogDStream)

    //6.将满足要求的用户写入黑名单
    BlackListHandler.addBlackList(filterAdsLogDStream)

    //7.测试打印
    filterAdsLogDStream.cache()
    filterAdsLogDStream.count().print()

    //8.开启任务
    ssc.start()
    ssc.awaitTermination()
  }
}

5,运行测试

(1)首先启动我们之前编写的数据生成器程序(点击查看),启动后可以看到程序每隔 2 秒便会产生一批数据:

(2)接着启动本文编写的广告黑名单程序,查看 user_ad_count 表可以看到用户点击数据在不断的更新:

(3)稍等一会后,当某用户针对广告点击数到达 30 次时,该用户便会添加到 black_list 表中:
评论

全部评论(0)

回到顶部