返回 导航

Spark

hangge.com

Spark - 性能优化详解6(算子优化)

作者:hangge | 2024-08-01 08:49

六、算子优化1:map vs mapPartitions

1,基本介绍

(1)map 操作:对 RDD 中的每个元素进行操作,一次处理一条数据。
(2)mapPartitions 操作:对 RDD 中每个 partition 进行操作,一次处理一个分区的数据。

2,二者对比

(1)OOM 方面对比:
  • map 操作: 执行 1map 算子只处理 1 个元素,如果 partition 中的元素较多,假设当前已经处理了 1000 个元素,在内存不足的情况下,Spark 可以通过 GC 等方法回收内存(比如将已处理掉的 1000 个元素从内存中回收)。因此, map 操作通常不会导致 OOM 异常;
  • mapPartitions 操作: 执行 1map 算子需要接收该 partition 中的所有元素,因此一旦元素很多而内存不足,就容易导致 OOM 的异常。也不是说一定就会产生 OOM 异常,只是和 map 算子对比的话,相对来说容易产生 OOM 异常。
(2)性能方面对比:
  • mapPartitions 的性能更高;初始化操作、数据库链接等操作适合使用 mapPartitions 操作。

3,使用建议

(1)假设需要将 RDD 中的每个元素写入数据库中,这时候就应该把创建数据库链接的操作放置在 mapPartitions 中,创建数据库链接这个操作本身就是个比较耗时的,如果该操作放在 map 中执行,将会频繁执行,比较耗时且影响数据库的稳定性。
(2)下面是一个相关的样例代码:
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer

object MapPartitionsOpScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("MapPartitionsOpScala")
      .setMaster("local")
    val sc = new SparkContext(conf)

    //设置分区数量为2
    val dataRDD = sc.parallelize(Array(1,2,3,4,5),2)

    //map算子一次处理一条数据
    /*val sum = dataRDD.map(item=>{
      println("==============")
      item * 2
    }).reduce( _ + _)*/

    //mapPartitions算子一次处理一个分区的数据
    val sum = dataRDD.mapPartitions(it=>{
      //建议针对初始化链接之类的操作,使用mapPartitions,放在mapPartitions内部
      //例如:创建数据库链接,使用mapPartitions可以减少链接创建的次数,提高性能
      //注意:创建数据库链接的代码建议放在次数,不要放在Driver端或者it.foreach内部
      //数据库链接放在Driver端会导致链接无法序列化,无法传递到对应的task中执行,所以算子在执行时会报错
      //数据库链接放在it.foreach()内部还是会创建多个链接,和使用map算子的效果是一样的
      println("==================")
      val result = new ArrayBuffer[Int]()
      //这个foreach是调用的scala里面的函数
      it.foreach(item=>{
        result.+=(item * 2)
      })
      //关闭数据库链接
      result.toIterator
    }).reduce(_ + _)

    println("sum:"+sum)

    sc.stop()
  }
}

七、算子优化2:foreach vs foreachPartition

1,二者对比

(1)foreach:一次处理一条数据
(2)foreachPartition:一次处理一个分区的数据

2,优化建议

(1)同上面一样,初始化操作、数据库链接等操作适合使用 foreachPartition 操作。
提示foreachPartition 的特性和 mapPartitions 的特性是一样的,唯一的区别就是 mapPartitions transformation 操作(不会立即执行),foreachPartition action 操作(会立即执行)
(2)下面是一个相关的样例代码:
import org.apache.spark.{SparkConf, SparkContext}

object ForeachPartitionOpScala {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("ForeachPartitionOpScala")
      .setMaster("local")
    val sc = new SparkContext(conf)

    //设置分区数量为2
    val dataRDD = sc.parallelize(Array(1,2,3,4,5),2)

    //foreachPartition:一次处理一个分区的数据,作用和mapPartitions类似
    //唯一的区是mapPartitions是transformation算子,foreachPartition是action算子
    dataRDD.foreachPartition(it=>{
      //在此处获取数据库链接
      println("===============")
      it.foreach(item=>{
        //在这里使用数据库链接
        println(item)
      })
      //关闭数据库链接
    })

    sc.stop()
  }
}

八、算子优化3:使用repartition对RDD进行重分区

1,repartition 主要应用场景

(1)可以调整 RDD 的并行度
  • 针对个别 RDD,如果感觉分区数量不合适,想要调整,可以通过 repartition 进行调整,分区调整了之后,对应的并行度也就可以调整了
(2)可以解决 RDD 中数据倾斜的问题
  • 如果 RDD 中不同分区之间的数据出现了数据倾斜,可以通过 repartition 实现数据重新分发,可以均匀分发到不同分区中

2,使用样例

(1)下面是一个 repartition 的使用样例代码:
import org.apache.spark.{SparkConf, SparkContext}

object RepartitionOpScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("RepartitionOpScala")
      .setMaster("local")
    val sc = new SparkContext(conf)

    //设置分区数量为2
    val dataRDD = sc.parallelize(Array(1,2,3,4,5),2)

    //重新设置RDD的分区数量为3,这个操作会产生shuffle
    //也可以解决RDD中数据倾斜的问题
    dataRDD.repartition(3)
      .foreachPartition(it=>{
        println("=========")
        it.foreach(println(_))
      })

    //通过repartition可以控制输出数据产生的文件个数
    dataRDD.saveAsTextFile("hdfs://node1:9000/rep-001")
    dataRDD.repartition(1).saveAsTextFile("hdfs://node1:9000/rep-002")

    sc.stop()
  }
}

(2)运行后控制台输出内容如下:

(3)HDFS 上生成的文件如下:

九、算子优化4:reduceByKey 和 groupByKey 的区别

1,功能对比

(1)reduceByKey groupByKey 这两个算子都可以实现分组聚合功能。
(2)比如下面两行代码的最终效果是一样的,都是对 wordCountRDD 中每个单词出现的次数进行聚合统计。
val counts = wordCountRDD.reduceByKey(_ + _)
val counts = wordCountRDD.groupByKey().map(wc => (wc._1, wc._2.sum))

2,性能对比

(1)首先这两个算子在执行的时候都会产生 shuffle,但是:
  • 当采用 reduceByKey 时,数据在进行 shuffle 之前会先进行局部聚合
  • 当使用 groupByKey 时,数据在 shuffle 之前不会进行局部聚合,会原样进行 shuffle
(2)从下图可以看出,reduceByKey 就减少了 shuffle 的数据传送,所以效率会高一些。

(3)因此,由于 reduceByKey shuffle 之前会先对数据进行局部聚合,而 groupByKey 不会,所以在实现分组聚合的需求中,reduceByKey 性能略胜一筹。
评论

全部评论(0)

回到顶部