Spark 3.x新特性 - 自适应查询执行详解3(动态优化倾斜的Join)
作者:hangge | 2024-08-08 08:54
自适应查询执行(Adaptive Query Execution),简称为 AQE。它是对 Spark 执行计划的优化,它可以基于任务运行时统计的数据指标动态修改 Spark 的执行计划。自适应查询执行主要带来了下面这 3 点优化功能:
- 自适应调整 Shuffle 分区数量。
- 动态调整 Join 策略。
- 动态优化倾斜的 Join。
本文接着介绍其中的最后一个动态优化倾斜的 Join。
三、动态优化倾斜的 Join
1,策略介绍
(1)在进行 Join 操作的时候,如果数据在多个分区之间分布不均匀,很容易产生数据倾斜,如果数据倾斜比较严重会显著降低计算性能。
- 在 Spark 3.0 版本之前,如果在 join 的时候遇到了严重的数据倾斜,是需要我们自己对数据进行切分处理,提高计算效率的。
(2)Spark 3.0 有了动态优化倾斜的 Join 这个机制之后就很方便了。该机制会从 Shuffle 文件统计信息中自动检测到这种倾斜,然后它会将倾斜的分区做进一步切分,切分成更小的子分区,这些子分区会连接到对应的分区进行关联。
2,效果演示
(1)假设有两个表 t1 和 t2,其中表 t1 中的 P0 分区里面的数据量明显大于其他分区,默认的执行情况如下图所示:
- t1 表中 P0 分区的数据比 p1、p2、p3 这几个分区的数据大很多,可以认为 t1 表中的数据出现了倾斜。
- 当 t1 和 t2 表中 p1、p2、p3 这几个分区在 join 的时候基本上是不会出现数据倾斜的,因为这些分区的数据相对适中。但是 P0 分区在进行join的时候就会出现数据倾斜了,这样会导致 join 的时间过长。
(2)动态优化倾斜的 Join 机制会把 P0 分区切分成两个子分区 P0-1 和 P0-2,并将每个子分区关联到表 t2 的对应分区 P0,如下图所示:
- t2 表中的 P0 分区会复制出来两份相同的数据,和 t1 表中切分出来的 P0 分区的数据进行 Join 关联。
- 这样相当于就把 t1 表中倾斜的分区拆分打散了,最终在 join 的时候就不会产生数据倾斜了。
(3)如果没有这个优化,将有 4 个任务运行 Join 操作,其中 P0 分区对应的任务将消耗很长时间。优化之后,会有 5 个任务运行 Join 操作,每个任务消耗的时间大致相同,这样就可以获得最优的执行性能了。
3,核心参数
(1)针对动态优化倾斜的 Join 策略的核心参数主要包括下面这 3 个:
核心参数 | 默认值 | 解释 |
spark.sql.adaptive.skewJoin.enabled | true | 是否开启 AQE 机制中的动态优化倾斜的 Join 机制 |
spark.sql.adaptive.skewJoin.skewedPartitionFactor | 5 | 数据倾斜判断因子,必须同时满足(数据倾斜判断阈值) |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes | 268435456b (256M) | 数据倾斜判断阈值,必须同时满足(数据倾斜判断因子) |
(2)如果 Shuffle 中的一个分区的大小大于 skewedPartitionFactor 这个因子乘以 Shuffle 分区中位数的值,并且这个分区也大于 skewedPartitionThresholdInBytes 这个参数的值,则认为这个分区是倾斜的。
提示:理想情况下,skewedPartitionThresholdInBytes 参数的值应该大于 advisoryPartitionSizeInBytes 参数的值。因为后期在切分这个倾斜的分区时会依据 advisoryPartitionSizeInBytes 参数的值进行切分,如果 skewedPartitionThresholdInBytes 参数的值小于 advisoryPartitionSizeInBytes 的值,那就无法切分了。
(3)通过下面这个图,可以更加清晰的理解如何判断数据倾斜:
- 如果分区 A 中的数据大小大于 skewedPartitionFactor * 分区大小的中位数。并且分区 A 中的数据大小也大于 skewedPartitionThresholdInBytes 参数的值,则分区 A 就是一个倾斜的分区了,那也就意味着这个任务中的数据出现了数据倾斜,这样才会触发动态优化倾斜的 Join 功能。
附:案例演示
1,准备测试数据
(1)首先我们编译要给用于生成测试数据的 Java 类,代码如下:
import java.io.BufferedWriter; import java.io.FileWriter; import java.io.IOException; import java.util.Random; public class GenerateJSONData { public static void main(String[] args) throws Exception{ //动态优化倾斜的Join-生成测试数据 generateSkewJoin(); } private static void generateSkewJoin() throws IOException { //表t1 String fileName = "D:\\temp\\spark_json_skew_t1.dat"; System.out.println("start: 开始生成文件->" + fileName); BufferedWriter bfw = new BufferedWriter(new FileWriter(fileName)); int num = 0; while (num < 30000000) { bfw.write("{\"id\":\"100" + num + "\",\"score\":" + (45 + new Random().nextInt(50)) + "}"); bfw.newLine(); num++; if (num % 10000 == 0) { bfw.flush(); } } bfw.flush(); //表t2 fileName = "D:\\temp\\spark_json_skew_t2.dat"; System.out.println("start: 开始生成文件->" + fileName); bfw = new BufferedWriter(new FileWriter(fileName)); num = 0; while (num < 10000) { bfw.write("{\"id\":\"100" + num + "\",\"name\":\"zs" + num + "\",\"city\":\"bj\"}"); bfw.newLine(); num++; if (num % 10000 == 0) { bfw.flush(); } } while (num < 30000000) { bfw.write("{\"id\":\"100\",\"name\":\"zs" + num + "\",\"city\":\"sh\"}"); bfw.newLine(); num++; if (num % 10000 == 0) { bfw.flush(); } } bfw.flush(); bfw.close(); } }
2,测试未开启动态优化倾斜的 Join 功能
(1)为了进行对比试验,我们先开发一个未开启动态优化倾斜的 Join 功能的程序,代码如下:
import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object AQESkewJoinScala { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") //获取SparkSession,为了操作SparkSQL val sparkSession = SparkSession .builder() .appName("AQESkewJoinScala") .config(conf) //禁用AQE机制 .config("spark.sql.adaptive.enabled","false") //禁用动态优化倾斜的Join(其实只要禁用了AQE机制,这个参数就不用设置了) .config("spark.sql.adaptive.skewJoin.enabled","false") .getOrCreate() //读取Json格式数据,获取DataFrame val jsonDf1 = sparkSession.read.json("D:\\temp\\spark_json_skew_t1.dat") val jsonDf2 = sparkSession.read.json("D:\\temp\\spark_json_skew_t2.dat") //创建临时表 jsonDf1.createOrReplaceTempView("t1") jsonDf2.createOrReplaceTempView("t2") //执行SQL语句 val sql = """ |select | t1.id, | t2.name, | t1.score |from t1 join t2 | on t1.id = t2.id |""".stripMargin sparkSession.sql(sql).write.format("json") .save("D:\\temp\\skew_join_"+System.currentTimeMillis()) while (true){ ; } } }
(2)运行程序,然后使用浏览器访问 http://localhost:4040 打开任务界面中查看任务执行情况。可以看到 Spark SQL 中的 Join 功能在运行的时候产生了 200 个任务,因为默认 Shuffle 分区数量是 200,对应产生 200 个任务没有问题。
(3)是仔细查看这 200 个任务,发现其中有一个任务处理的数据量非常大,在 153M 左右。其他的任务处理的数据量很小,都在 1.5M 左右。从这里可以看出来这个 Join 操作在执行的时候出现了数据倾斜。
(4)针对这个任务而言,想要提高计算效率,需要把这个倾斜的分区中的数据进行拆分,拆分成多个子任务去执行,这样就可以了。这个问题正好是动态优化倾斜的 Join 这个功能可以解决的。
3,测试开启动态优化倾斜的 Join 功能
(1)我们修改下代码,验证一下动态优化倾斜的 Join 这个功能:
import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object AQESkewJoinScala { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") //获取SparkSession,为了操作SparkSQL val sparkSession = SparkSession .builder() .appName("AQESkewJoinScala") .config(conf) //禁用AQE机制 //.config("spark.sql.adaptive.enabled","false") //禁用动态优化倾斜的Join(其实只要禁用了AQE机制,这个参数就不用设置了) //.config("spark.sql.adaptive.skewJoin.enabled","false") //注意:在验证动态优化倾斜的Join这个功能的时候,最好先把自适应调整Shuffle分区数量这个功能禁用, //避免影响结果 .config("spark.sql.adaptive.coalescePartitions.enabled", "false") .getOrCreate() //读取Json格式数据,获取DataFrame val jsonDf1 = sparkSession.read.json("D:\\temp\\spark_json_skew_t1.dat") val jsonDf2 = sparkSession.read.json("D:\\temp\\spark_json_skew_t2.dat") //创建临时表 jsonDf1.createOrReplaceTempView("t1") jsonDf2.createOrReplaceTempView("t2") //执行SQL语句 val sql = """ |select | t1.id, | t2.name, | t1.score |from t1 join t2 | on t1.id = t2.id |""".stripMargin sparkSession.sql(sql).write.format("json") .save("D:\\temp\\skew_join_"+System.currentTimeMillis()) while (true){ ; } } }
(2)重新执行程序,到任务界面中查看效果。在 Stage 界面中发现还是 200 个任务,任务数量没有增加,也就意味着动态优化倾斜的 Join 没有真正生效。
(3)进入这个 Stage 查看详细信息。发现之前这个 153M 的分区的数据依然存在,没有被分开处理。
(4)为什么动态优化倾斜的 Join 这个功能没有生效呢?我们回头再看一下这个图表中的这 2 个参数:
(5)如果我们开启了动态优化倾斜的 Join 这个功能,并且确实有数据倾斜了,但是没有真正触发执行,那说明这个倾斜的数据没有满足这 2 个参数的指标。
(6)首先看一下第一个条件:skewedPartitionFactor 等于 5。分区大小的中位数可以到 SQL 任务的执行流程中查看,这个分区大小的中位数是一个预估值。到任务界面中,点击 SQL 模块查看:
- 从图中可以看出来分区大小的中位数就是 719 字节。
- 最终 5 * 719B = 3595B(约等于 0.003M)。
- 目前这个倾斜的分区是 153M,大于 0.003M,第一个条件满足。其实按照这个条件的话,目前所有分区都是满足的。
(7)接下来看第二个条件:
- skewedPartitionThresholdInBytes 这个参数默认是 256M,目前我们这个任务中倾斜的分区大小是 153M,不大于 256M,所以第二个条件不满足。
- 最终 Spark 任务不认为这个分区是倾斜的,所以没有触发动态优化倾斜的 Join 策略的执行。
(8)接下来我们来修改一下代码,让这个 153M 的分区同时满足这两个条件:
import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object AQESkewJoinScala { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") //获取SparkSession,为了操作SparkSQL val sparkSession = SparkSession .builder() .appName("AQESkewJoinScala") .config(conf) //禁用AQE机制 //.config("spark.sql.adaptive.enabled","false") //禁用动态优化倾斜的Join(其实只要禁用了AQE机制,这个参数就不用设置了) //.config("spark.sql.adaptive.skewJoin.enabled","false") //注意:在验证动态优化倾斜的Join这个功能的时候,最好先把自适应调整Shuffle分区数量这个功能禁用, //避免影响结果 .config("spark.sql.adaptive.coalescePartitions.enabled", "false") //某个分区 > skewedPartitionFactor * 中位数 .config("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5") //某个分区 > 100M .config("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "100mb") //如果这里指定的分区大小超过了任务中倾斜的分区的大小,这样就无法触发动态优化倾斜的Join这个功能了 //建议这里设置的分区大小最大也不能超过skewedPartitionThresholdInBytes的值(100M) .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "64mb") .getOrCreate() //读取Json格式数据,获取DataFrame val jsonDf1 = sparkSession.read.json("D:\\temp\\spark_json_skew_t1.dat") val jsonDf2 = sparkSession.read.json("D:\\temp\\spark_json_skew_t2.dat") //创建临时表 jsonDf1.createOrReplaceTempView("t1") jsonDf2.createOrReplaceTempView("t2") //执行SQL语句 val sql = """ |select | t1.id, | t2.name, | t1.score |from t1 join t2 | on t1.id = t2.id |""".stripMargin sparkSession.sql(sql).write.format("json") .save("D:\\temp\\skew_join_"+System.currentTimeMillis()) while (true){ ; } } }
(9)重新执行程序,到任务界面中查看效果。此时到 Stage 界面中发现产生了 202 个任务。
(10)点击查看这个 Stage 的详细信息。相当于把这个 153M 的分区,大致按照 64M,切分成了 3 份:61M、60M 和 34M。
- 之前是 1 个任务处理这 153M 的数据,现在是有 3 个任务一起来处理这 153M 的数据,这样就解决了数据倾斜的问题,提高了任务的计算效率。
提示:
- 并不是完全按照 64M 来切分,会大致按照 64M 来切分。
- 如果感觉这样拆分还是有点大,则可以对应的调整 advisoryPartitionSizeInBytes 参数的值,这个参数的值越小,拆分出来的分区就越多,但是太多了也不好,那样就是很多小任务了。
全部评论(0)