Spark - Structured Streaming使用详解4(输入源3:Kafka)
作者:hangge | 2024-01-12 08:45
五、Kafka 输入源
1,准备工作
(1)首先编辑项目的 pom.xml 文件,添加 Kafka 相关的依赖:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.12</artifactId> <version>3.3.1</version> </dependency>
(2)为了方便测试,我们还编写了如下生产者代码,它每隔 1 秒向指定主题发送包含多个随机单词的随机字符串,单词之间用空格隔开。
object KafkaProducer { def main(args: Array[String]): Unit = { val topic = "my-topic" // Kafka主题 val brokers = "192.168.60.9:9092" // Kafka集群地址 // 配置Kafka生产者 val props = new Properties() props.put("bootstrap.servers", brokers) props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, String](props) // 生成随机单词的列表 val randomWords = List("apple", "banana", "orange", "grape", "kiwi", "pear", "mango") // 每隔1秒发送一个包含多个随机单词的随机字符串到Kafka主题 while (true) { val randomWordCount = Random.nextInt(5) + 1 // 生成1到5个随机单词 val randomString = (1 to randomWordCount) .map(_ => randomWords(Random.nextInt(randomWords.length))) .mkString(" ") val record = new ProducerRecord[String, String](topic, randomString) producer.send(record) println("数据发送完毕:" + randomString) Thread.sleep(1000) } producer.close() } }
2,使用样例
(1)下面代码 Structured Streaming 会连续不断地从 Kafka 主题中读取新的消息,并将这些消息作为微批次处理,然后将结果写入下游:
import org.apache.spark.sql.{DataFrame, SparkSession} object Hello { def main(args: Array[String]): Unit = { // 创建 SparkSession val spark = SparkSession.builder() .appName("Hello") .master("local[*]") .getOrCreate() // 导入隐式转换 import spark.implicits._ // 创建一个流式DataFrame,这里从Kafka中读取数据 val lines: DataFrame = spark.readStream .format("kafka") // 设置 kafka 数据源 .option("kafka.bootstrap.servers", "192.168.60.9:9092") .option("subscribe", "my-topic") // 也可以订阅多个主题 "topic1,topic2" .load // 启动查询, 把结果打印到控制台 val query = lines.writeStream .outputMode("update") // 使用update输出模式 .format("console") .start() // 等待应用程序终止 query.awaitTermination() //关闭 Spark spark.stop() } }
(2)上面代码收到什么数据我们就输出什么数据,下面我们做个改进,对输入数据进行 word count 操作,并将结果输出到控制台:
import org.apache.spark.sql.{DataFrame, SparkSession} object Hello { def main(args: Array[String]): Unit = { // 创建 SparkSession val spark = SparkSession.builder() .appName("Hello") .master("local[*]") .getOrCreate() // 导入隐式转换 import spark.implicits._ // 创建一个流式DataFrame,这里从Kafka中读取数据 val lines: DataFrame = spark.readStream .format("kafka") // 设置 kafka 数据源 .option("kafka.bootstrap.servers", "192.168.60.9:9092") .option("subscribe", "my-topic") // 也可以订阅多个主题 "topic1,topic2" .load // Word Count 统计 val wordCounts = lines .select("value").as[String] // 将每条消息的 value 取出(不需要key) .flatMap(_.split("\\W+")) // 拆分出单词 .groupBy("value") // 分组 .count() // 启动查询, 把结果打印到控制台 val query = wordCounts.writeStream .outputMode("complete") // 使用complete输出模式 .format("console") .start() // 等待应用程序终止 query.awaitTermination() //关闭 Spark spark.stop() } }
全部评论(0)