Spark - Structured Streaming使用详解4(输入源3:Kafka)
作者:hangge | 2024-01-12 08:45
五、Kafka 输入源
1,准备工作
(1)首先编辑项目的 pom.xml 文件,添加 Kafka 相关的依赖:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.3.1</version>
</dependency>
(2)为了方便测试,我们还编写了如下生产者代码,它每隔 1 秒向指定主题发送包含多个随机单词的随机字符串,单词之间用空格隔开。
object KafkaProducer {
def main(args: Array[String]): Unit = {
val topic = "my-topic" // Kafka主题
val brokers = "192.168.60.9:9092" // Kafka集群地址
// 配置Kafka生产者
val props = new Properties()
props.put("bootstrap.servers", brokers)
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
// 生成随机单词的列表
val randomWords = List("apple", "banana", "orange", "grape", "kiwi", "pear", "mango")
// 每隔1秒发送一个包含多个随机单词的随机字符串到Kafka主题
while (true) {
val randomWordCount = Random.nextInt(5) + 1 // 生成1到5个随机单词
val randomString = (1 to randomWordCount)
.map(_ => randomWords(Random.nextInt(randomWords.length)))
.mkString(" ")
val record = new ProducerRecord[String, String](topic, randomString)
producer.send(record)
println("数据发送完毕:" + randomString)
Thread.sleep(1000)
}
producer.close()
}
}
2,使用样例
(1)下面代码 Structured Streaming 会连续不断地从 Kafka 主题中读取新的消息,并将这些消息作为微批次处理,然后将结果写入下游:
import org.apache.spark.sql.{DataFrame, SparkSession}
object Hello {
def main(args: Array[String]): Unit = {
// 创建 SparkSession
val spark = SparkSession.builder()
.appName("Hello")
.master("local[*]")
.getOrCreate()
// 导入隐式转换
import spark.implicits._
// 创建一个流式DataFrame,这里从Kafka中读取数据
val lines: DataFrame = spark.readStream
.format("kafka") // 设置 kafka 数据源
.option("kafka.bootstrap.servers", "192.168.60.9:9092")
.option("subscribe", "my-topic") // 也可以订阅多个主题 "topic1,topic2"
.load
// 启动查询, 把结果打印到控制台
val query = lines.writeStream
.outputMode("update") // 使用update输出模式
.format("console")
.start()
// 等待应用程序终止
query.awaitTermination()
//关闭 Spark
spark.stop()
}
}

(2)上面代码收到什么数据我们就输出什么数据,下面我们做个改进,对输入数据进行 word count 操作,并将结果输出到控制台:
import org.apache.spark.sql.{DataFrame, SparkSession}
object Hello {
def main(args: Array[String]): Unit = {
// 创建 SparkSession
val spark = SparkSession.builder()
.appName("Hello")
.master("local[*]")
.getOrCreate()
// 导入隐式转换
import spark.implicits._
// 创建一个流式DataFrame,这里从Kafka中读取数据
val lines: DataFrame = spark.readStream
.format("kafka") // 设置 kafka 数据源
.option("kafka.bootstrap.servers", "192.168.60.9:9092")
.option("subscribe", "my-topic") // 也可以订阅多个主题 "topic1,topic2"
.load
// Word Count 统计
val wordCounts = lines
.select("value").as[String] // 将每条消息的 value 取出(不需要key)
.flatMap(_.split("\\W+")) // 拆分出单词
.groupBy("value") // 分组
.count()
// 启动查询, 把结果打印到控制台
val query = wordCounts.writeStream
.outputMode("complete") // 使用complete输出模式
.format("console")
.start()
// 等待应用程序终止
query.awaitTermination()
//关闭 Spark
spark.stop()
}
}
全部评论(0)