Spark - 性能优化详解4(提高并行度)
作者:hangge | 2024-07-30 08:42
四、提高并行度
1,并行度介绍
(1)Spark 会自动设置以文件作为输入源的 RDD 的并行度,依据其大小,比如 HDFS,就会给每一个 block 创建一个 partition,也依据这个设置并行度。对于 reduceByKey 等会发生 shuffle 操作的算子,会使用并行度最大的父 RDD 的并行度。
(2)我们也可以手动使用 textFile()、parallelize() 等方法的第二个参数来设置并行度;也可以使用 spark.default.parallelism 参数,来设置统一的并行度。
2,如何设置合适的并行度
(1)实际上 Spark 集群的资源并不一定会被充分利用到,所以要尽量设置合理的并行度,来充分地利用集群的资源,这样才能提高 Spark 程序的性能。
(2)举个例子,我在 spark-submit 脚本中给任务设置了 5 个 executor,每个 executor 设置了 2 个 cpu core
./bin/spark-submit \ --class MoreParallelismScala \ --master yarn \ --deploy-mode cluster \ --num-executors 5 \ --executor-cores 2 \ spark-1.0-SNAPSHOT-jar-with-dependencies.jar
(3)此时,如果我们在代码中设置了默认并行度为 5:
conf.set("spark.default.parallelism","5")
(4)这个参数设置完了以后,也就意味着所有 RDD 的 partition 都被设置成了 5 个,针对 RDD 的每一个 partition,spark 会启动一个 task 来进行计算,所以对于所有的算子操作,都只会创建 5 个 task 来处理对应的 RDD 中的数据。
注意:我们前面在 spark-submit 脚本中设置了 5 个 executor,每个 executor 2 个 cpu core,所以这个时候 spark 其实会向 yarn 集群申请 10 个 cpu core,但是我们在代码中设置了默认并行度为 5,只会产生 5 个 task,一个 task 使用一个 cpu core,那也就意味着有 5 个 cpu core 是空闲的,这样申请的资源就浪费了一半。
提示:给每个 cpu 分配 2~3 个 task 是由于每个 task 执行的顺序和执行结束的时间很大概率是不一样的,如果正好有 10 个 cpu,运行 10 个 taks,那么某个 task 可能很快就执行完了,那么这个 CPU 就空闲下来了,这样资源就浪费了。
3,样例演示
(1)假设我们的任务代码如下,我们在代码中将全局并行度设置为 3:
import org.apache.spark.{SparkConf, SparkContext} object MoreParallelismScala { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("MoreParallelismScala") //设置全局并行度 conf.set("spark.default.parallelism","3") val sc = new SparkContext(conf) val dataRDD = sc.parallelize(Array("hello","you","hello","me","hangge", "hello","you","hello","me","bye")) dataRDD.map((_,1)) .reduceByKey(_ + _) .foreach(println(_)) sc.stop() } }
(2)接着对代码编译打包后使用如下 spark-submit 提交这个任务,这里我们为该任务设置了 3 个 executor,每个 executor 设置了 2 个 cpu core。
./bin/spark-submit \ --class MoreParallelismScala \ --master yarn \ --deploy-mode cluster \ --num-executors 3 \ --executor-cores 2 \ spark-1.0-SNAPSHOT-jar-with-dependencies.jar
(3)任务提交到集群运行之后,查看 spark 的任务界面。先看 executors,这里显示了 3 个 executor 进程和 1 个 driver 进程。
- 然后去看 Stages 界面,两个 Stage 都是 3 个 task 并行执行,这 3 个 task 会使用 3 个 cpu,但是我们给这个任务申请了 6 个 cpu,所以就有 3 个是空闲的了。
- 点击某一个 Stage 进去查看详细的 task 信息,可以看到确实只有 3 个 task。
(4)如果想要最大限度利用 CPU 的性能,至少将 spark.default.parallelism 的值设置为 6,这样可以实现一个 cpu 运行一个 task。
提示:其实按官方推荐给每个 cpu 分配 2~3 个 task 的话,那么就应该设置为 12 或者 18。
(5)其实这个参数也可以在 spark-submit 脚本中动态设置,通过 --conf 参数设置,这样就比较灵活了。这里我们将代码中设置 spark.default.parallelism 的配置注释掉
// conf.set("spark.default.parallelism","3")
(6)重新打包并执行如下命令提交任务,这次我们在 spark-submit 脚本中将并行度设置为 6:
./bin/spark-submit \ --class MoreParallelismScala \ --master yarn \ --deploy-mode cluster \ --num-executors 3 \ --executor-cores 2 \ --conf "spark.default.parallelism=6" \ spark-1.0-SNAPSHOT-jar-with-dependencies.jar
(7)执行结束后再来查看 spark 的任务界面,可以看到此时有 6 个 task 并行执行:
- 然后点击具体的 Stage,进去查看任务信息,可以看到确实有 6 个 task。这样就能充分利用申请的 cpu 资源。
附:多 executor 模式与多 core 模式比较
1,多 executor 模式
(1)下面方式最终会向集群申请 2 个 cpu core:
--num-executors 2 --executor-cores 1
(2)这种方式由于每个 executor 只分配了一个 cpu core,我们将无法利用在同一个 JVM 中运行多个任务的优点。 并且我们假设这两个 executor 是在两个节点中启动的,那么针对广播变量这种操作,将在两个节点的中都复制 1 份,最终会复制两份。
2,多 core 模式
(1)下面方式同样最终会向集群申请 2 个 cpu core:
--num-executors 1 --executor-cores 2
(2)此时一个 executor 中会有 2 个 cpu core,这样可以利用同一个 JVM 中运行多个任务的优点,并且针对广播变量的这种操作,只会在这个 executor 对应的节点中复制1份即可。
是不是我可以给一个 executor 分配很多的 cpu core?
- 也不是的,因为一个 executor 的内存大小是固定的,如果在里面运行过多的 task 可能会导致内存不够用,所以这块一般在工作中我们会给一个 executor 分配 2~4G 内存,对应的分配 2~4 个 cpu core。
全部评论(0)