Spark - Spark Streaming使用详解5(DStream无状态转换操作)
作者:hangge | 2023-12-12 08:59
在前面的文章中,我介绍了 Spark Streaming 的基本概念、输入源、高级数据源和自定义数据源的内容。本文我将介绍 DStream 的无状态转换操作,这些操作不需要维护状态信息,适用于那些每个批次之间独立处理的情况。
五、DStream 无状态转换操作
1,基本介绍
- 无状态转换操作是指那些不需要维护状态信息的转换操作,每个批次之间相互独立,不依赖于之前的数据处理结果。即把简单的 RDD 转化操作应用到每个批次上,也就是转化 DStream 中的每一个 RDD。
- 无状态转换操作非常适合于一些独立的数据转换和过滤,比如映射、过滤、扁平化等操作。
2,映射操作(map)
(1)map 操作对 DStream 中的每个元素应用一个函数,并返回一个新的 DStream,其中包含函数应用后的结果。
// 假设我们有一个输入DStream
val inputDStream: DStream[String] = ssc.socketTextStream("localhost", 9999)
// 使用map操作将每个元素转换为其长度
val lengthDStream: DStream[Int] = inputDStream.map(_.length)
// 打印结果
lengthDStream.print()
(2)假设我们输入是 hello hangge.com,则输出如下内容:

3,扁平化操作(flatMap)
(1)flatMap 操作类似于 map 操作,同样是对 DStream 中的每个元素应用一个函数,并返回一个新的 DStream。但是它的结果是一个扁平化的 DStream,即每个输入元素可以映射到零个、一个或多个输出元素。
// 假设我们有一个输入DStream
val inputDStream: DStream[String] = ssc.socketTextStream("localhost", 9999)
// 使用flatMap操作将每个元素拆分成单词
val wordsDStream: DStream[String] = inputDStream.flatMap(_.split(" "))
// 打印结果
wordsDStream.print()
(2)假设我们输入是 hello hangge.com,则输出如下内容:

4,过滤操作(filter)
(1)filter 操作用于在 DStream 中过滤出满足特定条件的元素:
// 假设我们有一个输入DStream
val inputDStream: DStream[String] = ssc.socketTextStream("localhost", 9999)
// 将输入DStream中的字符串元素转换为整数
val intDStream: DStream[Int] = inputDStream.map(_.toInt)
// 使用filter操作过滤出偶数元素
val evenDStream: DStream[Int] = intDStream.filter(_ % 2 == 0)
// 打印结果
evenDStream.print()
(2)假设我们依次输入 1、2、3、4、5,则输出如下内容:

5,重新分区操作(repartition)
repartition 操作用于重新分区 DStream 中的数据,以便更有效地进行数据处理和并行计算,类似于 RDD 批处理中的 repartition。通过重新分区,我们可以调整数据的分布,使得后续的处理可以更加均衡和高效。
注意:repartition 操作可能会引起数据的洗牌(shuffle),因此它可能会产生一些性能开销。通常情况下,我们应该在确实需要改变数据分区的情况下使用 repartition,以便在并行计算和负载均衡方面获得更好的效果。
// 假设我们有一个输入DStream
val inputDStream: DStream[String] = ssc.socketTextStream("localhost", 9999)
// 使用repartition操作重新分区DStream中的数据
val repartitionedDStream = inputDStream.repartition(4) // 假设我们要分成4个分区
// 输出结果
repartitionedDStream.print()
6,聚合操作(reduceByKey)
(1)reduceByKey 操作用于对具有相同键的元素进行聚合操作。
// 假设我们有一个输入DStream
val inputDStream: DStream[String] = ssc.socketTextStream("localhost", 9999)
// 将输入DStream中的元素转换为键值对数据 (word, count)
val keyValueDStream = inputDStream.map(line => (line, 1))
// 使用reduceByKey操作对相同键的值进行求和
val sumByKeyDStream = keyValueDStream.reduceByKey(_ + _)
// 打印结果
sumByKeyDStream.print()
(2)假设我们依次输入 a、b、a,则输出如下内容:

7,聚合操作(countByValue)
(1)countByValue 操作用于统计每个元素在 DStream 中出现的次数:
// 假设我们有一个输入DStream
val inputDStream: DStream[String] = ssc.socketTextStream("localhost", 9999)
// 使用countByValue操作统计每个元素出现的次数
val countDStream = inputDStream.countByValue()
// 打印结果
countDStream.print()
(2)假设我们依次输入 a、b、a,则输出如下内容:

8,分组操作(groupByKey)
(1)groupByKey 操作对具有相同键的值进行分组,得到新的键值对 DStream,其中每个键都与其对应的值集合相关联。
// 假设我们有一个输入DStream
val inputDStream: DStream[String] = ssc.socketTextStream("localhost", 9999)
// 将输入DStream中的元素转换为键值对数据 (word, count)
val keyValueDStream = inputDStream.map(line => (line, 1))
// 使用groupByKey操作对相同键的值进行分组
val groupedDStream = keyValueDStream.groupByKey()
// 输出结果
groupedDStream.print()
(2)假设我们依次输入 a、b、a,则输出如下内容:

9,转换操作(transform)
(1)transform 操作用于对 DStream 中的每个 RDD 应用任意的 RDD 转换操作,这样我们可以根据需要进行各种自定义的转换。
提示:该函数每一批次调度一次。其实也就是对 DStream 中的 RDD 应用转换。
// 假设我们有一个输入DStream
val inputDStream: DStream[String] = ssc.socketTextStream("localhost", 9999)
// 使用transform操作对每个RDD进行自定义转换,
val transformedDStream: DStream[String] = inputDStream.transform { rdd =>
// 将每个字符串转换为大写
rdd.map(_.toUpperCase())
}
// 输出结果
transformedDStream.print()
(2)假设我们输入是 hello hangge.com,则输出如下内容:

10,链接操作(join)
(1)join 操作用于将两个 DStream 中的数据按照键进行连接,类似于两个 RDD 的 JOIN 操作。该操作允许我们在流式数据中合并具有相同键的数据,从而在不同的 DStream 之间创建关联。
// 创建两个输入DStream,模拟键值对数据
val stream1 = ssc.socketTextStream("localhost", 9999)
val stream2 = ssc.socketTextStream("localhost", 8888)
// 将两个流转换为 KV 类型
val dstream1 = stream1.map(line => {
val parts = line.split(",")
(parts(0), parts(1))
})
val dstream2 = stream2.map(line => {
val parts = line.split(",")
(parts(0), parts(1))
})
// 使用join操作将具有相同键的数据合并
val joinedDStream = dstream1.join(dstream2)
// 输出结果
joinedDStream.print()
(2)测试时我们首先打开两个终端分别运行如下命令启动两个 TCP socket:
nc -lk 9999 nc -lk 8888
(3)接着启动 Spark Streaming 程序后,我们在两个终端中分别输入如下内容:

(4)Spark Streaming 应用程序这边将会实时处理输入的数据并输出结果:
全部评论(0)