返回 导航

Spark

hangge.com

Spark - RDD使用详解7(血缘关系、依赖关系、阶段划分、任务划分)

作者:hangge | 2023-10-16 09:00

九、RDD 的血缘关系、依赖关系、阶段划分、任务划分

1,血缘关系

(1)RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。当一个 RDD 通过一系列的转换操作生成新的 RDD 时,新的 RDD 会记录下它依赖的原始 RDD 以及转换操作的步骤,这样就形成了 RDD 之间的血缘关系。当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
(2)使用 RDDtoDebugString() 方法可以查看 RDD 的血缘关系(Lineage)以及转换操作的详细信息。该方法返回一个包含 RDD 血缘关系的字符串表示。
val listRDD = sc.makeRDD(List("welcome to hangge.com", "hello world", "hangge.com"))
println(listRDD.toDebugString)
println("----------------------")

val wordRDD: RDD[String] = listRDD.flatMap(_.split(" "))
println(wordRDD.toDebugString)
println("----------------------")

val mapRDD: RDD[(String, Int)] = wordRDD.map((_,1))
println(mapRDD.toDebugString)
println("----------------------")

val resultRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_+_)
println(resultRDD.toDebugString)
resultRDD.collect()

2,依赖关系

(1)依赖关系就是两个相邻 RDD 之间的关系。RDD 的依赖关系分为两种类型:窄依赖(Narrow Dependency)和宽依赖(Wide Dependency)。
  • 窄依赖Narrow Dependency):指父 RDD 的每个分区只被子 RDD 的一个分区所使用,例如 mapfilter 等这些算子。
  • 宽依赖Shuffle Dependency):父 RDD 的每个分区都可能被子 RDD 的多个分区使用,例如 groupByKeyreduceByKeysortBykey 等算子,这些算子其实都会产生 shuffle 操作。

(2)比如下面样例中:
  • wordRDDlistRDDmapRDDwordRDD 就是窄依赖关系(无需进行数据的 Shuffle 洗牌操作)
  • resultRDDmapRDD 则是宽依赖关系(需进行数据的 Shuffle 洗牌操作)
val listRDD = sc.makeRDD(List("welcome to hangge.com", "hello world", "hangge.com"))
println(listRDD.dependencies)
println("----------------------")

val wordRDD: RDD[String] = listRDD.flatMap(_.split(" "))
println(wordRDD.dependencies)
println("----------------------")

val mapRDD: RDD[(String, Int)] = wordRDD.map((_,1))
println(mapRDD.dependencies)
println("----------------------")

val resultRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_+_)
println(resultRDD.dependencies)
resultRDD.collect()

(3)窄依赖(Narrow Dependency)指的是一个父 RDD 的每个分区最多只被一个子 RDD 的分区所依赖。在这种情况下,Spark 可以将父 RDD 的每个分区直接映射到子 RDD 的分区上,而无需进行数据的洗牌操作(Shuffle)。窄依赖的转换操作包括 mapfilterunion 等。
  • 下面示例中,rdd2 是通过对 rdd1 应用 map 转换操作而得到的,转换操作是在每个分区上独立执行的,没有涉及数据的洗牌操作,因此 rdd2rdd1 有窄依赖关系。
// 创建RDD
val rdd1 = sc.makeRDD(List(1,2,3,4,5))

// 对RDD进行转换操作
val rdd2 = rdd1.map(_*2)

(4)宽依赖(Wide Dependency)指的是一个父 RDD 的分区被多个子 RDD 的分区所依赖。在这种情况下,Spark 需要进行数据的洗牌操作(Shuffle),将父 RDD 的分区数据重新分发到子 RDD 的分区上。宽依赖的转换操作包括 groupByKeyreduceByKey 等需要数据重组的操作。
  • 下面示例中,rdd2 是通过对 rdd1 应用 groupByKey 转换操作而得到的,groupByKey 需要将相同键的数据进行分组,因此需要进行数据的洗牌操作,rdd2rdd1 有宽依赖关系。
// 创建RDD
val rdd1 = sc.makeRDD(Array((1,"a"),(2,"b"),(3,"c"),(1,"d"),(1,"e"),(2,"f")))

// 对RDD进行转换操作
val rdd2 = rdd1.groupByKey()

3,RDD 阶段划分

(1)RDD 的阶段划分是指将 RDD 的计算流程划分为不同的阶段,每个阶段包含了一组可以并行执行的转换操作。划分阶段的目的是为了减少数据的传输和 shuffle 操作,从而提高计算效率。具体的划分过程如下:
  • RDD 的转换操作通常按照依赖关系被划分为不同的阶段。如果一个 RDD 的依赖关系链上存在宽依赖(即父 RDD 的分区和子 RDD 的分区之间存在多对多的依赖关系),那么该 RDD 的计算过程会被划分为多个阶段。
  • 划分阶段时,Spark 会根据宽依赖的划分策略将父 RDD 的每个分区映射到子 RDD 的每个分区,以形成多对多的依赖关系。
  • RDD 的划分过程还会考虑数据本地性,尽量将具有相同位置的分区放在同一个阶段中,以减少数据的网络传输。

(2)Stage 的划分规则:从后往前,遇到宽依赖就划分 Stage。这是因为 RDD 之间是有血缘关系的,后面的 RDD 依赖前面的 RDD,也就是说后面的 RDD 要等前面的 RDD 执行完,才会执行。所以从后往前遇到宽依赖就划分为两个 stageshuffle 前一个,shuffle 后一个。如果整个过程没有产生 shuffle 那就只会有一个 stage。以下图为例:
  • RDD G 往前推,到 RDD B 的时候,是窄依赖,所以不切分 Stage,再往前到 RDD A,此时产生了宽依赖,所以 RDD A 属于一个 StageRDD B G 属于一个 Stage
  • 再看下面,RDD G RDD F,产生了宽依赖,所以 RDD F 属于一个 Stage,因为 RDD F RDD CDE 这几个 RDD 没有产生宽依赖,都是窄依赖,所以他们属于一个 Stage
  • 所以这个图中,RDD A 单独一个 stage1RDD CDEF 被划分在 stage2 中,最后 RDD B RDD G 划分在了 stage3 里面。
注意Stage 划分是从后往前划分,但是 stage 执行时从前往后的,这就是为什么后面先切割的 stage 为什么编号是 3

(3)DAGDirected Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。例如,DAG 记录了 RDD 的转换过程和任务的阶段。

4,RDD 任务划分

(1)RDD 任务划分可以细分为:ApplicationJobStageTask
  • Application:初始化一个 SparkContext 即生成一个 Application; 
  • Job:一个 Action 算子就会生成一个 Job;  
  • StageStage 等于宽依赖(ShuffleDependency)的个数加 1; 
  • Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数。
注意Application -> Job -> Stage -> Task 每一层都是 1 n 的关系。

(2)下面是一个任务划分的图示:
评论

全部评论(0)

回到顶部