Spark - SparkSQL使用详解5(Kafka的读取与写入)
作者:hangge | 2023-11-27 09:05
五、Kafka 的读取与写入
1,准备工作
首先编辑项目的 pom.xml 文件,添加Kafka相关的依赖:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.3.1</version>
</dependency>
2,输出数据到 Kafka
(1)这种方式输出离线处理的结果, 将已存在的数据分为若干批次进行处理,处理完毕后程序退出。下面代码将处理后的数据已批量的方式写入到 kafka 中:
import org.apache.spark.sql.{DataFrame, SparkSession}
object Hello {
def main(args: Array[String]): Unit = {
// 创建 SparkSession
val spark = SparkSession.builder()
.appName("HelloStructuredStreaming")
.master("local[*]")
.getOrCreate()
// 导入隐式转换
import spark.implicits._
// 创建一个静态DataFrame,这里通过数组创建
val wordCount: DataFrame = spark.sparkContext
.parallelize(Array("hello hangge", "hi hangge"))
.toDF("word")
.flatMap(_.getString(0).split("\\W+")) // 使用flatMap拆分和展平
.groupBy("value")
.count()
.map(row => row.getString(0) + "," + row.getLong(1))
.toDF("value") // 写入数据时候, 必须有一列 "value"
// 启动查询, 把结果输出至kafka
val query = wordCount.write
.format("kafka") // 输出至kafka
.option("kafka.bootstrap.servers", "192.168.60.9:9092") // kafka 配置
.option("topic", "my-topic") // kafka 主题
.save()
//关闭 Spark
spark.stop()
}
}
(2)执行程序等待运行结束后,查看 kafka 的 my-topic 主题,数据如下:

3,从 Kafka 读取数据
(1)下面代码从 Kafka 中读取上面添加的数据,然后进行处理并打印:
注意:读取数据时我们可以设置消费的起始偏移量和结束偏移量。如果不设置 checkpoint 的情况下, 默认起始偏移量 earliest, 结束偏移量为 latest.
import org.apache.spark.sql.{DataFrame, SparkSession}
object Hello {
def main(args: Array[String]): Unit = {
// 创建 SparkSession
val spark = SparkSession.builder()
.appName("Hello")
.master("local[*]")
.getOrCreate()
// 导入隐式转换
import spark.implicits._
// 创建一个流式DataFrame,这里从Kafka中读取数据
val lines: DataFrame = spark.read // 使用 read 方法,而不是 readStream 方法
.format("kafka") // 设置 kafka 数据源
.option("kafka.bootstrap.servers", "192.168.60.9:9092")
.option("subscribe", "my-topic") // 也可以订阅多个主题 "topic1,topic2"
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load
// 数据处理
val wordCounts = lines
.select("value").as[String] // 将每条消息的 value 取出(不需要key)
.map{ value => {
val arr = value.split(",")
(arr(0), arr(1))
}
}
.toDF("Word", "Count") // 添加列名
// 把结果打印到控制台
wordCounts.show()
//关闭 Spark
spark.stop()
}
}
(2)程序运行后,控制台输出如下内容:
全部评论(0)