Spark - 性能优化详解6(算子优化)
作者:hangge | 2024-08-01 08:49
六、算子优化1:map vs mapPartitions
1,基本介绍
(1)map 操作:对 RDD 中的每个元素进行操作,一次处理一条数据。
(2)mapPartitions 操作:对 RDD 中每个 partition 进行操作,一次处理一个分区的数据。
2,二者对比
(1)OOM 方面对比:
- map 操作: 执行 1 次 map 算子只处理 1 个元素,如果 partition 中的元素较多,假设当前已经处理了 1000 个元素,在内存不足的情况下,Spark 可以通过 GC 等方法回收内存(比如将已处理掉的 1000 个元素从内存中回收)。因此, map 操作通常不会导致 OOM 异常;
- mapPartitions 操作: 执行 1 次 map 算子需要接收该 partition 中的所有元素,因此一旦元素很多而内存不足,就容易导致 OOM 的异常。也不是说一定就会产生 OOM 异常,只是和 map 算子对比的话,相对来说容易产生 OOM 异常。
(2)性能方面对比:
- mapPartitions 的性能更高;初始化操作、数据库链接等操作适合使用 mapPartitions 操作。
3,使用建议
(1)假设需要将 RDD 中的每个元素写入数据库中,这时候就应该把创建数据库链接的操作放置在 mapPartitions 中,创建数据库链接这个操作本身就是个比较耗时的,如果该操作放在 map 中执行,将会频繁执行,比较耗时且影响数据库的稳定性。
(2)下面是一个相关的样例代码:
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
object MapPartitionsOpScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("MapPartitionsOpScala")
.setMaster("local")
val sc = new SparkContext(conf)
//设置分区数量为2
val dataRDD = sc.parallelize(Array(1,2,3,4,5),2)
//map算子一次处理一条数据
/*val sum = dataRDD.map(item=>{
println("==============")
item * 2
}).reduce( _ + _)*/
//mapPartitions算子一次处理一个分区的数据
val sum = dataRDD.mapPartitions(it=>{
//建议针对初始化链接之类的操作,使用mapPartitions,放在mapPartitions内部
//例如:创建数据库链接,使用mapPartitions可以减少链接创建的次数,提高性能
//注意:创建数据库链接的代码建议放在次数,不要放在Driver端或者it.foreach内部
//数据库链接放在Driver端会导致链接无法序列化,无法传递到对应的task中执行,所以算子在执行时会报错
//数据库链接放在it.foreach()内部还是会创建多个链接,和使用map算子的效果是一样的
println("==================")
val result = new ArrayBuffer[Int]()
//这个foreach是调用的scala里面的函数
it.foreach(item=>{
result.+=(item * 2)
})
//关闭数据库链接
result.toIterator
}).reduce(_ + _)
println("sum:"+sum)
sc.stop()
}
}
七、算子优化2:foreach vs foreachPartition
1,二者对比
(1)foreach:一次处理一条数据
(2)foreachPartition:一次处理一个分区的数据
2,优化建议
(1)同上面一样,初始化操作、数据库链接等操作适合使用 foreachPartition 操作。
提示:foreachPartition 的特性和 mapPartitions 的特性是一样的,唯一的区别就是
mapPartitions 是 transformation 操作(不会立即执行),foreachPartition 是 action 操作(会立即执行)
(2)下面是一个相关的样例代码:
import org.apache.spark.{SparkConf, SparkContext}
object ForeachPartitionOpScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("ForeachPartitionOpScala")
.setMaster("local")
val sc = new SparkContext(conf)
//设置分区数量为2
val dataRDD = sc.parallelize(Array(1,2,3,4,5),2)
//foreachPartition:一次处理一个分区的数据,作用和mapPartitions类似
//唯一的区是mapPartitions是transformation算子,foreachPartition是action算子
dataRDD.foreachPartition(it=>{
//在此处获取数据库链接
println("===============")
it.foreach(item=>{
//在这里使用数据库链接
println(item)
})
//关闭数据库链接
})
sc.stop()
}
}
八、算子优化3:使用repartition对RDD进行重分区
1,repartition 主要应用场景
(1)可以调整 RDD 的并行度
- 针对个别 RDD,如果感觉分区数量不合适,想要调整,可以通过 repartition 进行调整,分区调整了之后,对应的并行度也就可以调整了
(2)可以解决 RDD 中数据倾斜的问题
- 如果 RDD 中不同分区之间的数据出现了数据倾斜,可以通过 repartition 实现数据重新分发,可以均匀分发到不同分区中
2,使用样例
(1)下面是一个 repartition 的使用样例代码:
import org.apache.spark.{SparkConf, SparkContext}
object RepartitionOpScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("RepartitionOpScala")
.setMaster("local")
val sc = new SparkContext(conf)
//设置分区数量为2
val dataRDD = sc.parallelize(Array(1,2,3,4,5),2)
//重新设置RDD的分区数量为3,这个操作会产生shuffle
//也可以解决RDD中数据倾斜的问题
dataRDD.repartition(3)
.foreachPartition(it=>{
println("=========")
it.foreach(println(_))
})
//通过repartition可以控制输出数据产生的文件个数
dataRDD.saveAsTextFile("hdfs://node1:9000/rep-001")
dataRDD.repartition(1).saveAsTextFile("hdfs://node1:9000/rep-002")
sc.stop()
}
}
(2)运行后控制台输出内容如下:

(3)HDFS 上生成的文件如下:

九、算子优化4:reduceByKey 和 groupByKey 的区别
1,功能对比
(1)reduceByKey 和 groupByKey 这两个算子都可以实现分组聚合功能。
(2)比如下面两行代码的最终效果是一样的,都是对 wordCountRDD 中每个单词出现的次数进行聚合统计。
val counts = wordCountRDD.reduceByKey(_ + _) val counts = wordCountRDD.groupByKey().map(wc => (wc._1, wc._2.sum))
2,性能对比
(1)首先这两个算子在执行的时候都会产生 shuffle,但是:
- 当采用 reduceByKey 时,数据在进行 shuffle 之前会先进行局部聚合
- 当使用 groupByKey 时,数据在 shuffle 之前不会进行局部聚合,会原样进行 shuffle
(2)从下图可以看出,reduceByKey 就减少了 shuffle 的数据传送,所以效率会高一些。

(3)因此,由于 reduceByKey 在 shuffle 之前会先对数据进行局部聚合,而 groupByKey 不会,所以在实现分组聚合的需求中,reduceByKey 性能略胜一筹。
全部评论(0)