返回 导航

Spark

hangge.com

Spark - 性能优化详解4(提高并行度)

作者:hangge | 2024-07-30 08:42

四、提高并行度

1,并行度介绍

(1)Spark 会自动设置以文件作为输入源的 RDD 的并行度,依据其大小,比如 HDFS,就会给每一个 block 创建一个 partition,也依据这个设置并行度。对于 reduceByKey 等会发生 shuffle 操作的算子,会使用并行度最大的父 RDD 的并行度。

(2)我们也可以手动使用 textFile()parallelize() 等方法的第二个参数来设置并行度;也可以使用 spark.default.parallelism 参数,来设置统一的并行度。

2,如何设置合适的并行度

(1)实际上 Spark 集群的资源并不一定会被充分利用到,所以要尽量设置合理的并行度,来充分地利用集群的资源,这样才能提高 Spark 程序的性能。

(2)举个例子,我在 spark-submit 脚本中给任务设置了 5executor,每个 executor 设置了 2cpu core
./bin/spark-submit \
--class MoreParallelismScala \
--master yarn \
--deploy-mode cluster \
--num-executors 5 \
--executor-cores 2 \
spark-1.0-SNAPSHOT-jar-with-dependencies.jar

(3)此时,如果我们在代码中设置了默认并行度为 5
conf.set("spark.default.parallelism","5")

(4)这个参数设置完了以后,也就意味着所有 RDD partition 都被设置成了 5 个,针对 RDD 的每一个 partitionspark 会启动一个 task 来进行计算,所以对于所有的算子操作,都只会创建 5task 来处理对应的 RDD 中的数据。
注意:我们前面在 spark-submit 脚本中设置了 5 executor,每个 executor 2 cpu core,所以这个时候 spark 其实会向 yarn 集群申请 10 cpu core,但是我们在代码中设置了默认并行度为 5,只会产生 5task,一个 task 使用一个 cpu core,那也就意味着有 5cpu core 是空闲的,这样申请的资源就浪费了一半。

(5)最好的情况,就是每个 cpu core 都不闲着,一直在运行,这样可以达到资源的最大使用率。其实让一个 cpu core 运行一个 task 都是有点浪费的,因此官方推荐给每个 cpu 分配 2~3task 是比较合理的,可以充分利用 CPU 资源,发挥它最大的价值。
提示:给每个 cpu 分配 2~3 task 是由于每个 task 执行的顺序和执行结束的时间很大概率是不一样的,如果正好有 10 cpu,运行 10 taks,那么某个 task 可能很快就执行完了,那么这个 CPU 就空闲下来了,这样资源就浪费了。

3,样例演示

(1)假设我们的任务代码如下,我们在代码中将全局并行度设置为 3
import org.apache.spark.{SparkConf, SparkContext}

object MoreParallelismScala {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("MoreParallelismScala")

    //设置全局并行度
    conf.set("spark.default.parallelism","3")

    val sc = new SparkContext(conf)

    val dataRDD = sc.parallelize(Array("hello","you","hello","me","hangge",
      "hello","you","hello","me","bye"))
    dataRDD.map((_,1))
      .reduceByKey(_ + _)
      .foreach(println(_))

    sc.stop()
  }

}

(2)接着对代码编译打包后使用如下 spark-submit 提交这个任务,这里我们为该任务设置了 3executor,每个 executor 设置了 2 cpu core
./bin/spark-submit \
--class MoreParallelismScala \
--master yarn \
--deploy-mode cluster \
--num-executors 3 \
--executor-cores 2 \
spark-1.0-SNAPSHOT-jar-with-dependencies.jar

(3)任务提交到集群运行之后,查看 spark 的任务界面。先看 executors,这里显示了 3executor 进程和 1driver 进程。

  • 然后去看 Stages 界面,两个 Stage 都是 3task 并行执行,这 3task 会使用 3cpu,但是我们给这个任务申请了 6 cpu,所以就有 3 个是空闲的了。

  • 点击某一个 Stage 进去查看详细的 task 信息,可以看到确实只有 3task

(4)如果想要最大限度利用 CPU 的性能,至少将 spark.default.parallelism 的值设置为 6,这样可以实现一个 cpu 运行一个 task
提示:其实按官方推荐给每个 cpu 分配 2~3 task 的话,那么就应该设置为 12 或者 18

(5)其实这个参数也可以在 spark-submit 脚本中动态设置,通过 --conf 参数设置,这样就比较灵活了。这里我们将代码中设置 spark.default.parallelism 的配置注释掉
// conf.set("spark.default.parallelism","3")

(6)重新打包并执行如下命令提交任务,这次我们在 spark-submit 脚本中将并行度设置为 6
./bin/spark-submit \
--class MoreParallelismScala \
--master yarn \
--deploy-mode cluster \
--num-executors 3 \
--executor-cores 2 \
--conf "spark.default.parallelism=6" \
spark-1.0-SNAPSHOT-jar-with-dependencies.jar

(7)执行结束后再来查看 spark 的任务界面,可以看到此时有 6task 并行执行:

  • 然后点击具体的 Stage,进去查看任务信息,可以看到确实有 6 task。这样就能充分利用申请的 cpu 资源。

附:多 executor 模式与多 core 模式比较

1,多 executor 模式

(1)下面方式最终会向集群申请 2cpu core
--num-executors 2
--executor-cores 1

(2)这种方式由于每个 executor 只分配了一个 cpu core,我们将无法利用在同一个 JVM 中运行多个任务的优点。 并且我们假设这两个 executor 是在两个节点中启动的,那么针对广播变量这种操作,将在两个节点的中都复制 1 份,最终会复制两份。

2,多 core 模式

(1)下面方式同样最终会向集群申请 2cpu core
--num-executors 1
--executor-cores 2

(2)此时一个 executor 中会有 2cpu core,这样可以利用同一个 JVM 中运行多个任务的优点,并且针对广播变量的这种操作,只会在这个 executor 对应的节点中复制1份即可。
是不是我可以给一个 executor 分配很多的 cpu core
  • 也不是的,因为一个 executor 的内存大小是固定的,如果在里面运行过多的 task 可能会导致内存不够用,所以这块一般在工作中我们会给一个 executor 分配 2~4G 内存,对应的分配 2~4 cpu core
评论

全部评论(0)

回到顶部