返回 导航

Spark

hangge.com

Spark - RDD使用详解4(转换算子3:Key-Value类型)

作者:hangge | 2023-09-30 11:00

六、Key-Value 类型的转换算子

1,partitionBy

(1)该函数将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner,它根据键的哈希值将数据分配到不同的分区中。这样,具有相同键的数据将被分配到相同的分区中,从而便于后续基于键的操作,例如聚合、连接和排序等。
def partitionBy(partitioner: Partitioner): RDD[(K, V)]

(2)下面代码根据键的哈希值将数据分配到不同的分区中:
// 创建RDD(三个分区)
val rdd1: RDD[(Int, String)] = sc.makeRDD(Array((1,"a"),(2,"b"),(3,"c"),(1,"d"),(1,"e"),(2,"f")),3)

// 对 RDD 进行分区(指定一个HashPartitioner对象作为分区器,将数据分为2个分区。)
val rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(2))

// 打印结果
rdd2.glom().collect().foreach { partition =>
  println(partition.mkString(", "))
}

2,reduceByKey

(1)该函数可以将数据按照相同的 KeyValue 进行聚合。具体操作是按照键对数据进行分组,然后将每个键对应的值进行聚合。它使用用户提供的聚合函数对每个键的值进行合并,以生成一个新的键值对数据集。
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

(2)下面样例对 RDD 进行键值对聚合:
// 创建RDD
val rdd1 = sc.makeRDD(List(("apple", 3), ("banana", 2), ("apple", 5), ("banana", 1)))

// 进行键值对聚合
val rdd2 = rdd1.reduceByKey(_ + _)

// 打印结果
rdd2.collect().foreach(println)

3,groupByKey

(1)该函数将数据源的数据根据 keyvalue 进行分组,,并将具有相同键的所有值收集到一个迭代器中,生成一个新的键值对数据集。
reduceByKey 和 groupByKey 的区别:
  • 从 shuffle 的角度reduceByKeygroupByKey 都存在 shuffle 的操作,但是 reduceByKey 可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。
  • 从功能的角度reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用 groupByKey
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

(2)下面代码对 RDD 进行分组:
// 创建RDD
val rdd1 = sc.makeRDD(List(("apple", 3), ("banana", 2), ("apple", 5), ("banana", 1)))

// 对 RDD 进行分组
val rdd2: RDD[(String, Iterable[Int])] = rdd1.groupByKey()

// 打印结果
rdd2.collect().foreach(println)

4,aggregateByKey

(1)该函数允许我们指定初始值和聚合函数,将数据根据不同的规则进行分区内计算和分区间计算。
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
 combOp: (U, U) => U): RDD[(K, U)]

(2)下面代码取出每个分区内相同 key 的最大值然后分区间相加:
// 创建RDD
val rdd1 = sc.makeRDD(List(
  ("a",1),("a",2),("c",3),
  ("b",4),("c",5),("c",6)
),2)

// 对 RDD 进行聚合
// 分区内取最大值,两个分区的结果数据分别是:("a",2),("c",3) 和 ("b",4),("c",6)
// 分区间数值相加,最终结果是:("a",2),("c",6),("b",4)
val rdd2 = rdd1.aggregateByKey(0)(
  (x, y) => math.max(x,y),
  (x, y) => x + y
)

// 打印结果
rdd2.collect().foreach(println)

5,combineByKey

(1)该函数用于对键值对数据进行聚合操作,并提供了更灵活的聚合方式。它允许我们定义三个函数来指定在分区内、分区间和最终结果合并时的聚合逻辑。
  • combineByKey() 操作按照键对数据进行分组,并对每个键的值进行聚合。
  • 对于每个键,它在分区内首次遇到该键时,使用 createCombiner() 函数将当前值转换为初始聚合值。
  • 在后续遇到相同键的值时,使用 mergeValue() 函数将当前值与已经聚合的值进行合并。
  • 最后,在所有分区上进行合并时,使用 mergeCombiners() 函数将各个分区的聚合结果合并为最终结果。
def combineByKey[C](
 createCombiner: V => C,
 mergeValue: (C, V) => C,
 mergeCombiners: (C, C) => C): RDD[(K, C)]
reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别:
  • reduceByKey:相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同
  • FoldByKey:相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同
  • AggregateByKey:相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同
  • CombineByKey:当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则不相同。

(2)下面代码计算每个 key 对应数值总数,以及 key 出现的次数:
// 创建RDD
val rdd1 = sc.makeRDD(List(
  ("a",1),("a",2),("c",3),
  ("b",4),("c",5),("c",6)
),2)

// 对 RDD 进行聚合
// 1,将当前值转换为初始聚合值(数值,次数1),两个分组值分别是:
//    ("a",(1,1)),("a",(2,1)),("c",(3,1))
//    ("b",(4,1)),("c",(5,1)),("c",(6,1))
// 2,分区内计算,数值累加,次数+1,两个分组值分别是:
//    ("a",(3,2)),("c",(3,1))
//    ("b",(4,1)),("c",(11,2))
// 3,分区间计算,数值累加,次数累加,最终结果是:
//    ("a",(3,2)),("c",(14,3)),("b",(4,1))
val rdd2 = rdd1.combineByKey(
  (_, 1),
  (acc: (Int, Int), value) => (acc._1 + value, acc._2 + 1),
  (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)

// 打印结果
rdd2.collect().foreach(println)

(3)对上面代码增加一步操作,则可以计算每个 key 对应的平均值:
// 创建RDD
val rdd1 = sc.makeRDD(List(
  ("a",1),("a",2),("c",3),
  ("b",4),("c",5),("c",6)
),2)

// 对 RDD 进行聚合
// 1,将当前值转换为初始聚合值(数值,次数1),两个分组值分别是:
//    ("a",(1,1)),("a",(2,1)),("c",(3,1))
//    ("b",(4,1)),("c",(5,1)),("c",(6,1))
// 2,分区内计算,数值累加,次数+1,两个分组值分别是:
//    ("a",(3,2)),("c",(3,1))
//    ("b",(4,1)),("c",(11,2))
// 3,分区间计算,数值累加,次数累加,最终结果是:
//    ("a",(3,2)),("c",(14,3)),("b",(4,1))
val rdd2 = rdd1.combineByKey(
  (_, 1),
  (acc: (Int, Int), value) => (acc._1 + value, acc._2 + 1),
  (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)

// 计算每个key对应的平均值
val rdd3  =  rdd2.mapValues{ case (sum, count) => sum.toDouble / count }

// 打印结果
rdd3.collect().foreach(println)

6,sortByKey

(1)该函数用于按照键对键值对数据进行排序。默认情况下,排序是升序的,将 ascending 设置为 false 则为降序。
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)]

(2)下面代码将数据按照 key 进行排序:
// 创建RDD
val rdd1 = sc.makeRDD(List(("c", 3), ("b", 2), ("a", 1), ("d", 4)))

// 按照 key 进行排序
val rdd2 = rdd1.sortByKey()

// 打印结果
rdd2.collect().foreach(println)

7,join

(1)该函数用于将两个数据集按照键进行连接操作。它根据两个数据集的共同键将它们的记录进行匹配,并返回连接后的结果。
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

(2)下面代码对两个 RDD 进行连接:
// 创建RDD
val rdd1 = sc.makeRDD(List(("apple", 3), ("banana", 2), ("orange", 5)))
val rdd2 = sc.makeRDD(List(("apple", "red"), ("banana", "yellow"), ("grape", "purple")
  ,("apple", "green")))

// 对两个 RDD 进行连接
val rdd3 = rdd1.join(rdd2)

// 打印结果
rdd3.collect().foreach(println)

8,leftOuterJoin

(1)该函数用于执行左外连接操作,即保留左侧数据集的所有记录,并根据键匹配右侧数据集的记录。如果右侧数据集中没有与左侧数据集匹配的记录,则使用 null 填充。
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

(2)下面代码对两个 RDD 进行左外连接连接:
// 创建RDD
val rdd1 = sc.makeRDD(List(("apple", 3), ("banana", 2), ("orange", 5)))
val rdd2 = sc.makeRDD(List(("apple", "red"), ("banana", "yellow"), ("grape", "purple")
  ,("apple", "green")))

// 对两个 RDD 进行左外连接连接
val rdd3 = rdd1.leftOuterJoin(rdd2)

// 打印结果
rdd3.collect().foreach(println)

9,cogroup

(1)该函数用于将多个数据集按照键进行聚合操作,并返回一个包含聚合结果的数据集。它将具有相同键的记录进行分组,并将每个键对应的所有记录放入一个迭代器中。
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

(2)下面代码我们使用 cogroup() 函数对两个 RDD 进行聚合,根据键将它们的记录进行分组。
// 创建RDD
val rdd1 = sc.makeRDD(List(("apple", 3), ("banana", 2), ("orange", 5)))
val rdd2 = sc.makeRDD(List(("apple", "red"), ("banana", "yellow"), ("grape", "purple")
  ,("apple", "green")))

// 对两个 RDD 进行左外连接连接
val rdd3:RDD[(String, (Iterable[Int], Iterable[String]))] = rdd1.cogroup(rdd2)

// 打印结果
rdd3.collect().foreach{ case (key, (values1, values2)) =>
  println(s"$key: ${values1.mkString(", ")} - ${values2.mkString(", ")}")
}
评论

全部评论(0)

回到顶部