Spark - 性能优化详解6(算子优化)
作者:hangge | 2024-08-01 08:49
六、算子优化1:map vs mapPartitions
1,基本介绍
(1)map 操作:对 RDD 中的每个元素进行操作,一次处理一条数据。
(2)mapPartitions 操作:对 RDD 中每个 partition 进行操作,一次处理一个分区的数据。
2,二者对比
(1)OOM 方面对比:
- map 操作: 执行 1 次 map 算子只处理 1 个元素,如果 partition 中的元素较多,假设当前已经处理了 1000 个元素,在内存不足的情况下,Spark 可以通过 GC 等方法回收内存(比如将已处理掉的 1000 个元素从内存中回收)。因此, map 操作通常不会导致 OOM 异常;
- mapPartitions 操作: 执行 1 次 map 算子需要接收该 partition 中的所有元素,因此一旦元素很多而内存不足,就容易导致 OOM 的异常。也不是说一定就会产生 OOM 异常,只是和 map 算子对比的话,相对来说容易产生 OOM 异常。
(2)性能方面对比:
- mapPartitions 的性能更高;初始化操作、数据库链接等操作适合使用 mapPartitions 操作。
3,使用建议
(1)假设需要将 RDD 中的每个元素写入数据库中,这时候就应该把创建数据库链接的操作放置在 mapPartitions 中,创建数据库链接这个操作本身就是个比较耗时的,如果该操作放在 map 中执行,将会频繁执行,比较耗时且影响数据库的稳定性。
(2)下面是一个相关的样例代码:
import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable.ArrayBuffer object MapPartitionsOpScala { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("MapPartitionsOpScala") .setMaster("local") val sc = new SparkContext(conf) //设置分区数量为2 val dataRDD = sc.parallelize(Array(1,2,3,4,5),2) //map算子一次处理一条数据 /*val sum = dataRDD.map(item=>{ println("==============") item * 2 }).reduce( _ + _)*/ //mapPartitions算子一次处理一个分区的数据 val sum = dataRDD.mapPartitions(it=>{ //建议针对初始化链接之类的操作,使用mapPartitions,放在mapPartitions内部 //例如:创建数据库链接,使用mapPartitions可以减少链接创建的次数,提高性能 //注意:创建数据库链接的代码建议放在次数,不要放在Driver端或者it.foreach内部 //数据库链接放在Driver端会导致链接无法序列化,无法传递到对应的task中执行,所以算子在执行时会报错 //数据库链接放在it.foreach()内部还是会创建多个链接,和使用map算子的效果是一样的 println("==================") val result = new ArrayBuffer[Int]() //这个foreach是调用的scala里面的函数 it.foreach(item=>{ result.+=(item * 2) }) //关闭数据库链接 result.toIterator }).reduce(_ + _) println("sum:"+sum) sc.stop() } }
七、算子优化2:foreach vs foreachPartition
1,二者对比
(1)foreach:一次处理一条数据
(2)foreachPartition:一次处理一个分区的数据
2,优化建议
(1)同上面一样,初始化操作、数据库链接等操作适合使用 foreachPartition 操作。
提示:foreachPartition 的特性和 mapPartitions 的特性是一样的,唯一的区别就是
mapPartitions 是 transformation 操作(不会立即执行),foreachPartition 是 action 操作(会立即执行)
(2)下面是一个相关的样例代码:
import org.apache.spark.{SparkConf, SparkContext} object ForeachPartitionOpScala { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("ForeachPartitionOpScala") .setMaster("local") val sc = new SparkContext(conf) //设置分区数量为2 val dataRDD = sc.parallelize(Array(1,2,3,4,5),2) //foreachPartition:一次处理一个分区的数据,作用和mapPartitions类似 //唯一的区是mapPartitions是transformation算子,foreachPartition是action算子 dataRDD.foreachPartition(it=>{ //在此处获取数据库链接 println("===============") it.foreach(item=>{ //在这里使用数据库链接 println(item) }) //关闭数据库链接 }) sc.stop() } }
八、算子优化3:使用repartition对RDD进行重分区
1,repartition 主要应用场景
(1)可以调整 RDD 的并行度
- 针对个别 RDD,如果感觉分区数量不合适,想要调整,可以通过 repartition 进行调整,分区调整了之后,对应的并行度也就可以调整了
(2)可以解决 RDD 中数据倾斜的问题
- 如果 RDD 中不同分区之间的数据出现了数据倾斜,可以通过 repartition 实现数据重新分发,可以均匀分发到不同分区中
2,使用样例
(1)下面是一个 repartition 的使用样例代码:
import org.apache.spark.{SparkConf, SparkContext} object RepartitionOpScala { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("RepartitionOpScala") .setMaster("local") val sc = new SparkContext(conf) //设置分区数量为2 val dataRDD = sc.parallelize(Array(1,2,3,4,5),2) //重新设置RDD的分区数量为3,这个操作会产生shuffle //也可以解决RDD中数据倾斜的问题 dataRDD.repartition(3) .foreachPartition(it=>{ println("=========") it.foreach(println(_)) }) //通过repartition可以控制输出数据产生的文件个数 dataRDD.saveAsTextFile("hdfs://node1:9000/rep-001") dataRDD.repartition(1).saveAsTextFile("hdfs://node1:9000/rep-002") sc.stop() } }
(2)运行后控制台输出内容如下:
(3)HDFS 上生成的文件如下:
九、算子优化4:reduceByKey 和 groupByKey 的区别
1,功能对比
(1)reduceByKey 和 groupByKey 这两个算子都可以实现分组聚合功能。
(2)比如下面两行代码的最终效果是一样的,都是对 wordCountRDD 中每个单词出现的次数进行聚合统计。
val counts = wordCountRDD.reduceByKey(_ + _) val counts = wordCountRDD.groupByKey().map(wc => (wc._1, wc._2.sum))
2,性能对比
(1)首先这两个算子在执行的时候都会产生 shuffle,但是:
- 当采用 reduceByKey 时,数据在进行 shuffle 之前会先进行局部聚合
- 当使用 groupByKey 时,数据在 shuffle 之前不会进行局部聚合,会原样进行 shuffle
(2)从下图可以看出,reduceByKey 就减少了 shuffle 的数据传送,所以效率会高一些。
(3)因此,由于 reduceByKey 在 shuffle 之前会先对数据进行局部聚合,而 groupByKey 不会,所以在实现分组聚合的需求中,reduceByKey 性能略胜一筹。
全部评论(0)