Spark - Spark Streaming使用详解9(DStream数据输出:文件、数据库、消息队列)
作者:hangge | 2023-12-19 08:56
在 Spark Streaming 中,我们常常需要将处理后的数据发送到各种不同的目的地,比如文件系统、数据库、消息队列等,用于展示、存储或进一步分析。本文将演示一些常见的 DStream 输出操作。
九、DStream 数据输出
1,基本介绍
输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。
注意:与 RDD 中的惰性求值类似,如果一个 DStream 及其派生出的 DStream 都没有被执行输出操作,那么这些 DStream 就都不会被求值。如果 StreamingContext 中没有设定输出操作,整个 context 就都不会启动。
2,将数据打印出来
(1)print() 函数在运行流程序的驱动节点上打印 DStream 中每一批次数据的最开始 10 个元素。这在开发和调试过程中非常有用。
object Hello {
def main(args: Array[String]): Unit = {
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
// 初始化 StreamingContext,设置微批处理的时间间隔为 5 秒
val ssc = new StreamingContext(sparkConf, Seconds(5))
// 从监控端口读取数据流
val inputDStream = ssc.socketTextStream("localhost", 9999)
// 打印结果
inputDStream.print()
// 启动 StreamingContext
ssc.start()
// 等待应用程序终止(遇错停止,否则不断工作)
ssc.awaitTermination()
}
}
(2)我们输入 a 回车 b,等待 5 秒,输入 c 回车 d,在应用控制台这边则会打印如下信息:

3,将数据保存到文本文件
(1)saveAsTextFiles(prefix, [suffix]) 方法以 text 文件形式存储 DStream 的内容。每个批次的数据都会被保存为一个单独的文件夹,文件夹名字基于参数中的 prefix 和 suffix,即“prefix-Time_IN_MS[.suffix]”。每个文件夹中会包含该批次的数据作为文本文件。
object Hello {
def main(args: Array[String]): Unit = {
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
// 初始化 StreamingContext,设置微批处理的时间间隔为 5 秒
val ssc = new StreamingContext(sparkConf, Seconds(5))
// 从监控端口读取数据流
val inputDStream = ssc.socketTextStream("localhost", 9999)
// 将结果保存到文件中
inputDStream.saveAsTextFiles("output/streamData", "txt")
// 启动 StreamingContext
ssc.start()
// 等待应用程序终止(遇错停止,否则不断工作)
ssc.awaitTermination()
}
}
(2)上面程序运行后可以看到每隔 5 秒就会创建一个文件夹,并且将该批次的数据保存在对应文件夹中:

4,将数据保存为 SequenceFiles
(1)saveAsObjectFiles(prefix, [suffix]) 方法以 Java 对象序列化的方式将 Stream 中的数据保存为 SequenceFiles。每个批次的数据都会被保存为一个单独的文件夹,文件夹名字基于参数中的 prefix 和 suffix,即“prefix-Time_IN_MS[.suffix]”。每个文件夹中会包含该批次的数据作为文本文件。
object Hello {
def main(args: Array[String]): Unit = {
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
// 初始化 StreamingContext,设置微批处理的时间间隔为 5 秒
val ssc = new StreamingContext(sparkConf, Seconds(5))
// 从监控端口读取数据流
val inputDStream = ssc.socketTextStream("localhost", 9999)
// 将结果保存为 SequenceFiles
inputDStream.saveAsObjectFiles("output/streamData", "seq")
// 启动 StreamingContext
ssc.start()
// 等待应用程序终止(遇错停止,否则不断工作)
ssc.awaitTermination()
}
}
(2)上面程序运行后可以看到每隔 5 秒就会创建一个文件夹,并且将该批次的数据保存在对应文件夹中:

5,使用通用的输出操作(foreachRDD)
(1)foreachRDD(func) 是最通用的输出操作,用于对 DStream 中的每个 RDD 运行任意计算。参数传入的函数 func 应该实现将每个 RDD 中的数据推送到外部系统,例如将 RDD 存入文件或通过网络写入数据库。
(2)下面是将一个将 Dstream 数据存到 MySQL 数据库中的样例:
如果要将数据写入数据库,注意创建连接代码位置:
- 连接不能写在 driver 层面(序列化)
- 如果写在 foreach 则每个 RDD 中的每一条数据都创建,得不偿失;
- 增加 foreachPartition,在分区创建(获取)。
object Hello {
def main(args: Array[String]): Unit = {
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
// 初始化 StreamingContext,设置微批处理的时间间隔为 5 秒
val ssc = new StreamingContext(sparkConf, Seconds(5))
// 创建一个 DStream,从 localhost:9999 接收输入
val lines = ssc.socketTextStream("localhost", 9999)
// 将每行拆分成单词
val words = lines.flatMap(_.split(" "))
// 统计每个单词出现的次数
val wordCounts = words.map((_, 1)).reduceByKey(_ + _)
// 针对每个 RDD 执行写入数据库操作
wordCounts.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// 在每个分区内创建数据库连接
val connection = createConnection()
partitionOfRecords.foreach { record =>
// 执行数据插入操作
writeToDatabase(connection, record)
}
connection.close() // 关闭连接
}
}
// 启动 StreamingContext
ssc.start()
// 等待应用程序终止(遇错停止,否则不断工作)
ssc.awaitTermination()
}
// 创建数据库连接
def createConnection(): Connection = {
val jdbcUrl = "jdbc:mysql://192.168.43.96:3306/hangge"
val user = "root"
val password = "hangge1234"
// 加载 MySQL 驱动程序
Class.forName("com.mysql.jdbc.Driver")
// 建立连接
DriverManager.getConnection(jdbcUrl, user, password)
}
// 将数据写入数据库
def writeToDatabase(connection: Connection, record: (String, Int)): Unit = {
val sql = "INSERT INTO word_count (word, count) VALUES (?, ?)"
val statement = connection.prepareStatement(sql)
// 设置参数
statement.setString(1, record._1)
statement.setInt(2, record._2)
// 执行插入操作
statement.executeUpdate()
}
}
(3)下面是一个将 DStream 数据发送到 Kafka 的样例:
注意:同上面创建数据库连接一样,生产者对象要放在分区创建。
object Hello {
def main(args: Array[String]): Unit = {
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
// 初始化 StreamingContext,设置微批处理的时间间隔为 5 秒
val ssc = new StreamingContext(sparkConf, Seconds(5))
// 创建一个 DStream,从 localhost:9999 接收输入
val lines = ssc.socketTextStream("localhost", 9999)
// 将每行拆分成单词
val words = lines.flatMap(_.split(" "))
// 统计每个单词出现的次数
val wordCounts = words.map((_, 1)).reduceByKey(_ + _)
// Kafka 生产者配置
val props = new java.util.Properties()
props.put("bootstrap.servers", "192.168.60.9:9092") // Kafka 服务器地址
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
// 针对每个 RDD 执行发送消息到 Kafka 操作
wordCounts.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// 在每个分区内创建Kafka生产者
val producer = new KafkaProducer[String, String](props)
partitionOfRecords.foreach { record =>
// 构建消息,发送到 Kafka 主题
val message = new ProducerRecord[String, String]("your_topic",
record._1, record._2.toString)
producer.send(message)
}
// 关闭 Kafka 生产者
producer.close()
}
}
// 启动 StreamingContext
ssc.start()
// 等待应用程序终止(遇错停止,否则不断工作)
ssc.awaitTermination()
}
}
全部评论(0)