返回 导航

Spark

hangge.com

Spark - Spark Streaming使用详解9(DStream数据输出:文件、数据库、消息队列)

作者:hangge | 2023-12-19 08:56
    在 Spark Streaming 中,我们常常需要将处理后的数据发送到各种不同的目的地,比如文件系统、数据库、消息队列等,用于展示、存储或进一步分析。本文将演示一些常见的 DStream 输出操作。

九、DStream 数据输出

1,基本介绍

    输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。
注意:与 RDD 中的惰性求值类似,如果一个 DStream 及其派生出的 DStream 都没有被执行输出操作,那么这些 DStream 就都不会被求值。如果 StreamingContext 中没有设定输出操作,整个 context 就都不会启动。

2,将数据打印出来

(1)print() 函数在运行流程序的驱动节点上打印 DStream 中每一批次数据的最开始 10 个元素。这在开发和调试过程中非常有用。
object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")

    // 初始化 StreamingContext,设置微批处理的时间间隔为 5 秒
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    // 从监控端口读取数据流
    val inputDStream = ssc.socketTextStream("localhost", 9999)

    // 打印结果
    inputDStream.print()

    // 启动 StreamingContext
    ssc.start()

    // 等待应用程序终止(遇错停止,否则不断工作)
    ssc.awaitTermination()
  }
}

(2)我们输入 a 回车 b,等待 5 秒,输入 c 回车 d,在应用控制台这边则会打印如下信息:

3,将数据保存到文本文件

(1)saveAsTextFiles(prefix, [suffix]) 方法以 text 文件形式存储 DStream 的内容。每个批次的数据都会被保存为一个单独的文件夹,文件夹名字基于参数中的 prefixsuffix,即“prefix-Time_IN_MS[.suffix]”。每个文件夹中会包含该批次的数据作为文本文件。
object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")

    // 初始化 StreamingContext,设置微批处理的时间间隔为 5 秒
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    // 从监控端口读取数据流
    val inputDStream = ssc.socketTextStream("localhost", 9999)

    // 将结果保存到文件中
    inputDStream.saveAsTextFiles("output/streamData", "txt")

    // 启动 StreamingContext
    ssc.start()

    // 等待应用程序终止(遇错停止,否则不断工作)
    ssc.awaitTermination()
  }
}

(2)上面程序运行后可以看到每隔 5 秒就会创建一个文件夹,并且将该批次的数据保存在对应文件夹中:

4,将数据保存为 SequenceFiles

(1)saveAsObjectFiles(prefix, [suffix]) 方法以 Java 对象序列化的方式将 Stream 中的数据保存为 SequenceFiles。每个批次的数据都会被保存为一个单独的文件夹,文件夹名字基于参数中的 prefixsuffix,即“prefix-Time_IN_MS[.suffix]”。每个文件夹中会包含该批次的数据作为文本文件。
object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")

    // 初始化 StreamingContext,设置微批处理的时间间隔为 5 秒
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    // 从监控端口读取数据流
    val inputDStream = ssc.socketTextStream("localhost", 9999)

    // 将结果保存为 SequenceFiles
    inputDStream.saveAsObjectFiles("output/streamData", "seq")

    // 启动 StreamingContext
    ssc.start()

    // 等待应用程序终止(遇错停止,否则不断工作)
    ssc.awaitTermination()
  }
}

(2)上面程序运行后可以看到每隔 5 秒就会创建一个文件夹,并且将该批次的数据保存在对应文件夹中:

5,使用通用的输出操作(foreachRDD)

(1)foreachRDD(func) 是最通用的输出操作,用于对 DStream 中的每个 RDD 运行任意计算。参数传入的函数 func 应该实现将每个 RDD 中的数据推送到外部系统,例如将 RDD 存入文件或通过网络写入数据库。
(2)下面是将一个将 Dstream 数据存到 MySQL 数据库中的样例:
如果要将数据写入数据库,注意创建连接代码位置:
  • 连接不能写在 driver 层面(序列化)
  • 如果写在 foreach 则每个 RDD 中的每一条数据都创建,得不偿失;
  • 增加 foreachPartition,在分区创建(获取)。
object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")

    // 初始化 StreamingContext,设置微批处理的时间间隔为 5 秒
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    // 创建一个 DStream,从 localhost:9999 接收输入
    val lines = ssc.socketTextStream("localhost", 9999)
    // 将每行拆分成单词
    val words = lines.flatMap(_.split(" "))
    // 统计每个单词出现的次数
    val wordCounts = words.map((_, 1)).reduceByKey(_ + _)

    // 针对每个 RDD 执行写入数据库操作
    wordCounts.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
        // 在每个分区内创建数据库连接
        val connection = createConnection()

        partitionOfRecords.foreach { record =>
          // 执行数据插入操作
          writeToDatabase(connection, record)
        }

        connection.close() // 关闭连接
      }
    }

    // 启动 StreamingContext
    ssc.start()

    // 等待应用程序终止(遇错停止,否则不断工作)
    ssc.awaitTermination()
  }

  // 创建数据库连接
  def createConnection(): Connection = {
    val jdbcUrl = "jdbc:mysql://192.168.43.96:3306/hangge"
    val user = "root"
    val password = "hangge1234"
    // 加载 MySQL 驱动程序
    Class.forName("com.mysql.jdbc.Driver")
    // 建立连接
    DriverManager.getConnection(jdbcUrl, user, password)
  }

  // 将数据写入数据库
  def writeToDatabase(connection: Connection, record: (String, Int)): Unit = {
    val sql = "INSERT INTO word_count (word, count) VALUES (?, ?)"
    val statement = connection.prepareStatement(sql)
    // 设置参数
    statement.setString(1, record._1)
    statement.setInt(2, record._2)
    // 执行插入操作
    statement.executeUpdate()
  }
}

(3)下面是一个将 DStream 数据发送到 Kafka 的样例:
注意:同上面创建数据库连接一样,生产者对象要放在分区创建。
object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")

    // 初始化 StreamingContext,设置微批处理的时间间隔为 5 秒
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    // 创建一个 DStream,从 localhost:9999 接收输入
    val lines = ssc.socketTextStream("localhost", 9999)
    // 将每行拆分成单词
    val words = lines.flatMap(_.split(" "))
    // 统计每个单词出现的次数
    val wordCounts = words.map((_, 1)).reduceByKey(_ + _)

    // Kafka 生产者配置
    val props = new java.util.Properties()
    props.put("bootstrap.servers", "192.168.60.9:9092") // Kafka 服务器地址
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    // 针对每个 RDD 执行发送消息到 Kafka 操作
    wordCounts.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
        // 在每个分区内创建Kafka生产者
        val producer = new KafkaProducer[String, String](props)

        partitionOfRecords.foreach { record =>
          // 构建消息,发送到 Kafka 主题
          val message = new ProducerRecord[String, String]("your_topic",
            record._1, record._2.toString)
          producer.send(message)
        }

        // 关闭 Kafka 生产者
        producer.close()
      }
    }

    // 启动 StreamingContext
    ssc.start()

    // 等待应用程序终止(遇错停止,否则不断工作)
    ssc.awaitTermination()
  }
}
评论

全部评论(0)

回到顶部