返回 导航

Spark

hangge.com

Spark - SparkSQL使用详解5(Kafka的读取与写入)

作者:hangge | 2023-11-27 09:05

五、Kafka 的读取与写入

1,准备工作

首先编辑项目的 pom.xml 文件,添加Kafka相关的依赖:
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
    <version>3.3.1</version>
</dependency>

2,输出数据到 Kafka

(1)这种方式输出离线处理的结果, 将已存在的数据分为若干批次进行处理,处理完毕后程序退出。下面代码将处理后的数据已批量的方式写入到 kafka 中:
import org.apache.spark.sql.{DataFrame, SparkSession}

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("HelloStructuredStreaming")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 创建一个静态DataFrame,这里通过数组创建
    val wordCount: DataFrame = spark.sparkContext
      .parallelize(Array("hello hangge", "hi hangge"))
      .toDF("word")
      .flatMap(_.getString(0).split("\\W+")) // 使用flatMap拆分和展平
      .groupBy("value")
      .count()
      .map(row => row.getString(0) + "," + row.getLong(1))
      .toDF("value")  // 写入数据时候, 必须有一列 "value"

    // 启动查询, 把结果输出至kafka
    val query = wordCount.write
      .format("kafka") // 输出至kafka
      .option("kafka.bootstrap.servers", "192.168.60.9:9092") // kafka 配置
      .option("topic", "my-topic") // kafka 主题
      .save()

    //关闭 Spark
    spark.stop()
  }
}

(2)执行程序等待运行结束后,查看 kafka my-topic 主题,数据如下:

3,从 Kafka 读取数据

(1)下面代码从 Kafka 中读取上面添加的数据,然后进行处理并打印:
注意:读取数据时我们可以设置消费的起始偏移量和结束偏移量。如果不设置 checkpoint 的情况下, 默认起始偏移量 earliest, 结束偏移量为 latest.
import org.apache.spark.sql.{DataFrame, SparkSession}

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("Hello")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 创建一个流式DataFrame,这里从Kafka中读取数据
    val lines: DataFrame = spark.read  // 使用 read 方法,而不是 readStream 方法
      .format("kafka") // 设置 kafka 数据源
      .option("kafka.bootstrap.servers", "192.168.60.9:9092")
      .option("subscribe", "my-topic") // 也可以订阅多个主题 "topic1,topic2"
      .option("startingOffsets", "earliest")
      .option("endingOffsets", "latest")
      .load

    // 数据处理
    val wordCounts = lines
      .select("value").as[String] // 将每条消息的 value 取出(不需要key)
      .map{ value => {
          val arr = value.split(",")
          (arr(0), arr(1))
        }
      }
      .toDF("Word", "Count") // 添加列名

    // 把结果打印到控制台
    wordCounts.show()

    //关闭 Spark
    spark.stop()
  }
}

(2)程序运行后,控制台输出如下内容:
评论

全部评论(0)

回到顶部