Spark - Spark Streaming使用详解3(高级数据源:Kafka数据源)
作者:hangge | 2023-12-06 08:40
Kafka 作为一个高性能的消息队列系统,为实时数据流的传输和处理提供了强大的支持。下面我将介绍如何使用 Spark Streaming 与 Kafka 集成,实现从 Kafka 主题中读取数据并进行简单的实时统计分析。
三、使用 Kafka 作为输入数据源
1,添加依赖
首先编辑项目的 pom.xml 文件,添加 Kafka 相关的依赖:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.13</artifactId>
<version>3.4.0</version>
</dependency>
2,编写消费者代码
下面代码通过 SparkStreaming 从 Kafka 读取数据,并将读取过来的数据做简单的 word count 计算,最终打印到控制台。
object Hello {
def main(args: Array[String]): Unit = {
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
// 初始化 StreamingContext,设置微批处理的时间间隔为 3 秒
val ssc = new StreamingContext(sparkConf, Seconds(3))
val topic = "my-topic" // Kafka主题
val brokers = "192.168.60.9:9092" // Kafka集群地址
//定义 Kafka 参数
val kafkaPara: Map[String, Object] = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> topic,
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
)
// 读取 Kafka 数据创建 DStream
val stream: InputDStream[ConsumerRecord[String, String]] =
KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Set(topic), kafkaPara))
// Word Count统计
val wordCounts = stream
.map(record => record.value()) // 将每条消息的 value 取出(不需要key)
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
// 打印Word Count结果
wordCounts.print()
// 开启任务
ssc.start()
// 等待应用程序终止
ssc.awaitTermination()
}
}
3,编写生产者代码
为了方便测试,我们还编写了如下生产者代码,它每隔 1 秒向指定主题发送包含多个随机单词的随机字符串,单词之间用空格隔开。
object KafkaProducer {
def main(args: Array[String]): Unit = {
val topic = "my-topic" // Kafka主题
val brokers = "192.168.60.9:9092" // Kafka集群地址
// 配置Kafka生产者
val props = new Properties()
props.put("bootstrap.servers", brokers)
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
// 生成随机单词的列表
val randomWords = List("apple", "banana", "orange", "grape", "kiwi", "pear", "mango")
// 每隔1秒发送一个包含多个随机单词的随机字符串到Kafka主题
while (true) {
val randomWordCount = Random.nextInt(5) + 1 // 生成1到5个随机单词
val randomString = (1 to randomWordCount)
.map(_ => randomWords(Random.nextInt(randomWords.length)))
.mkString(" ")
val record = new ProducerRecord[String, String](topic, randomString)
producer.send(record)
println("数据发送完毕:" + randomString)
Thread.sleep(1000)
}
producer.close()
}
}
4,运行测试
(1)首先我们要准备启动 Kafka 服务,具体可以参考我之前的文章:
(2)接着启动我们编写的生产者程序,启动后可以看到程序每隔 1 秒便会向指定主题发送一个随机字符串:

(3)然后启动我们编写的消费者程序,可以看到控制台输出如下内容,说明我们使用 Spark Streaming 来对 Kafka 主题中的数据进行 Word Count 统计这个功能实现成功。
全部评论(0)