返回 导航

Spark

hangge.com

Spark - Spark Streaming使用详解5(DStream无状态转换操作)

作者:hangge | 2023-12-12 08:59
    在前面的文章中,我介绍了 Spark Streaming 的基本概念、输入源、高级数据源和自定义数据源的内容。本文我将介绍 DStream 的无状态转换操作,这些操作不需要维护状态信息,适用于那些每个批次之间独立处理的情况。

五、DStream 无状态转换操作

1,基本介绍

  • 无状态转换操作是指那些不需要维护状态信息的转换操作,每个批次之间相互独立,不依赖于之前的数据处理结果。即把简单的 RDD 转化操作应用到每个批次上,也就是转化 DStream 中的每一个 RDD
  • 无状态转换操作非常适合于一些独立的数据转换和过滤,比如映射、过滤、扁平化等操作。

2,映射操作(map)

(1)map 操作对 DStream 中的每个元素应用一个函数,并返回一个新的 DStream,其中包含函数应用后的结果。
// 假设我们有一个输入DStream
val inputDStream: DStream[String] = ssc.socketTextStream("localhost", 9999)

// 使用map操作将每个元素转换为其长度
val lengthDStream: DStream[Int] = inputDStream.map(_.length)

// 打印结果
lengthDStream.print()

(2)假设我们输入是 hello hangge.com,则输出如下内容:

3,扁平化操作(flatMap)

(1)flatMap 操作类似于 map 操作,同样是对 DStream 中的每个元素应用一个函数,并返回一个新的 DStream。但是它的结果是一个扁平化的 DStream,即每个输入元素可以映射到零个、一个或多个输出元素。
// 假设我们有一个输入DStream
val inputDStream: DStream[String] = ssc.socketTextStream("localhost", 9999)

// 使用flatMap操作将每个元素拆分成单词
val wordsDStream: DStream[String] = inputDStream.flatMap(_.split(" "))

// 打印结果
wordsDStream.print()

(2)假设我们输入是 hello hangge.com,则输出如下内容:

4,过滤操作(filter)

(1)filter 操作用于在 DStream 中过滤出满足特定条件的元素:
// 假设我们有一个输入DStream
val inputDStream: DStream[String] = ssc.socketTextStream("localhost", 9999)

// 将输入DStream中的字符串元素转换为整数
val intDStream: DStream[Int] = inputDStream.map(_.toInt)

    // 使用filter操作过滤出偶数元素
val evenDStream: DStream[Int] = intDStream.filter(_ % 2 == 0)

// 打印结果
evenDStream.print()

(2)假设我们依次输入 12345,则输出如下内容:

5,重新分区操作(repartition)

    repartition 操作用于重新分区 DStream 中的数据,以便更有效地进行数据处理和并行计算,类似于 RDD 批处理中的 repartition。通过重新分区,我们可以调整数据的分布,使得后续的处理可以更加均衡和高效。
注意repartition 操作可能会引起数据的洗牌(shuffle),因此它可能会产生一些性能开销。通常情况下,我们应该在确实需要改变数据分区的情况下使用 repartition,以便在并行计算和负载均衡方面获得更好的效果。
// 假设我们有一个输入DStream
val inputDStream: DStream[String] = ssc.socketTextStream("localhost", 9999)

// 使用repartition操作重新分区DStream中的数据
val repartitionedDStream = inputDStream.repartition(4) // 假设我们要分成4个分区

// 输出结果
repartitionedDStream.print()

6,聚合操作(reduceByKey)

(1)reduceByKey 操作用于对具有相同键的元素进行聚合操作。
// 假设我们有一个输入DStream
val inputDStream: DStream[String] = ssc.socketTextStream("localhost", 9999)

// 将输入DStream中的元素转换为键值对数据 (word, count)
val keyValueDStream = inputDStream.map(line => (line, 1))

// 使用reduceByKey操作对相同键的值进行求和
val sumByKeyDStream = keyValueDStream.reduceByKey(_ + _)

// 打印结果
sumByKeyDStream.print()

(2)假设我们依次输入 aba,则输出如下内容:

7,聚合操作(countByValue)

(1)countByValue 操作用于统计每个元素在 DStream 中出现的次数:
// 假设我们有一个输入DStream
val inputDStream: DStream[String] = ssc.socketTextStream("localhost", 9999)

// 使用countByValue操作统计每个元素出现的次数
val countDStream = inputDStream.countByValue()

// 打印结果
countDStream.print()

(2)假设我们依次输入 aba,则输出如下内容:

8,分组操作(groupByKey)

(1)groupByKey 操作对具有相同键的值进行分组,得到新的键值对 DStream,其中每个键都与其对应的值集合相关联。
// 假设我们有一个输入DStream
val inputDStream: DStream[String] = ssc.socketTextStream("localhost", 9999)

// 将输入DStream中的元素转换为键值对数据 (word, count)
val keyValueDStream = inputDStream.map(line => (line, 1))

// 使用groupByKey操作对相同键的值进行分组
val groupedDStream = keyValueDStream.groupByKey()

// 输出结果
groupedDStream.print()

(2)假设我们依次输入 aba,则输出如下内容:

9,转换操作(transform)

(1)transform 操作用于对 DStream 中的每个 RDD 应用任意的 RDD 转换操作,这样我们可以根据需要进行各种自定义的转换。
提示:该函数每一批次调度一次。其实也就是对 DStream 中的 RDD 应用转换。
// 假设我们有一个输入DStream
val inputDStream: DStream[String] = ssc.socketTextStream("localhost", 9999)

// 使用transform操作对每个RDD进行自定义转换,
val transformedDStream: DStream[String] = inputDStream.transform { rdd =>
  // 将每个字符串转换为大写
  rdd.map(_.toUpperCase())
}

// 输出结果
transformedDStream.print()

(2)假设我们输入是 hello hangge.com,则输出如下内容:

10,链接操作(join)

(1)join 操作用于将两个 DStream 中的数据按照键进行连接,类似于两个 RDDJOIN 操作。该操作允许我们在流式数据中合并具有相同键的数据,从而在不同的 DStream 之间创建关联。
// 创建两个输入DStream,模拟键值对数据
val stream1 = ssc.socketTextStream("localhost", 9999)
val stream2 = ssc.socketTextStream("localhost", 8888)

// 将两个流转换为 KV 类型
val dstream1 = stream1.map(line => {
  val parts = line.split(",")
  (parts(0), parts(1))
})

val dstream2 = stream2.map(line => {
  val parts = line.split(",")
  (parts(0), parts(1))
})

// 使用join操作将具有相同键的数据合并
val joinedDStream = dstream1.join(dstream2)

// 输出结果
joinedDStream.print()

(2)测试时我们首先打开两个终端分别运行如下命令启动两个 TCP socket
nc -lk 9999
nc -lk 8888

(3)接着启动 Spark Streaming 程序后,我们在两个终端中分别输入如下内容:

(4)Spark Streaming 应用程序这边将会实时处理输入的数据并输出结果:
评论

全部评论(0)

回到顶部