返回 导航

Spark

hangge.com

Spark - Spark Streaming使用详解3(高级数据源:Kafka数据源)

作者:hangge | 2023-12-06 08:40
    Kafka 作为一个高性能的消息队列系统,为实时数据流的传输和处理提供了强大的支持。下面我将介绍如何使用 Spark Streaming Kafka 集成,实现从 Kafka 主题中读取数据并进行简单的实时统计分析。

三、使用 Kafka 作为输入数据源

1,添加依赖

首先编辑项目的 pom.xml 文件,添加 Kafka 相关的依赖:
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.13</artifactId>
    <version>3.4.0</version>
</dependency>

2,编写消费者代码

下面代码通过 SparkStreamingKafka 读取数据,并将读取过来的数据做简单的 word count 计算,最终打印到控制台。
object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")

    // 初始化 StreamingContext,设置微批处理的时间间隔为 3 秒
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    val topic = "my-topic" // Kafka主题
    val brokers = "192.168.60.9:9092" // Kafka集群地址

    //定义 Kafka 参数
    val kafkaPara: Map[String, Object] = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
      ConsumerConfig.GROUP_ID_CONFIG -> topic,
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
    )
    // 读取 Kafka 数据创建 DStream
    val stream: InputDStream[ConsumerRecord[String, String]] =
      KafkaUtils.createDirectStream[String, String](ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](Set(topic), kafkaPara))

    // Word Count统计
    val wordCounts = stream
      .map(record => record.value()) // 将每条消息的 value 取出(不需要key)
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)

    // 打印Word Count结果
    wordCounts.print()

    // 开启任务
    ssc.start()

    // 等待应用程序终止
    ssc.awaitTermination()
  }
}

3,编写生产者代码

    为了方便测试,我们还编写了如下生产者代码,它每隔 1 秒向指定主题发送包含多个随机单词的随机字符串,单词之间用空格隔开。
object KafkaProducer {
  def main(args: Array[String]): Unit = {
    val topic = "my-topic" // Kafka主题
    val brokers = "192.168.60.9:9092" // Kafka集群地址

    // 配置Kafka生产者
    val props = new Properties()
    props.put("bootstrap.servers", brokers)
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)

    // 生成随机单词的列表
    val randomWords = List("apple", "banana", "orange", "grape", "kiwi", "pear", "mango")

    // 每隔1秒发送一个包含多个随机单词的随机字符串到Kafka主题
    while (true) {
      val randomWordCount = Random.nextInt(5) + 1 // 生成1到5个随机单词
      val randomString = (1 to randomWordCount)
        .map(_ => randomWords(Random.nextInt(randomWords.length)))
        .mkString(" ")

      val record = new ProducerRecord[String, String](topic, randomString)
      producer.send(record)
      println("数据发送完毕:" + randomString)
      Thread.sleep(1000)
    }

    producer.close()
  }
}

4,运行测试
(1)首先我们要准备启动 Kafka 服务,具体可以参考我之前的文章:

(2)接着启动我们编写的生产者程序,启动后可以看到程序每隔 1 秒便会向指定主题发送一个随机字符串:

(3)然后启动我们编写的消费者程序,可以看到控制台输出如下内容,说明我们使用 Spark Streaming 来对 Kafka 主题中的数据进行 Word Count 统计这个功能实现成功。
评论

全部评论(0)

回到顶部