返回 导航

Spark

hangge.com

Spark 3.x新特性 - 自适应查询执行详解3(动态优化倾斜的Join)

作者:hangge | 2024-08-08 08:54
    自适应查询执行(Adaptive Query Execution),简称为 AQE。它是对 Spark 执行计划的优化,它可以基于任务运行时统计的数据指标动态修改 Spark 的执行计划。自适应查询执行主要带来了下面这 3 点优化功能:
  • 自适应调整 Shuffle 分区数量。
  • 动态调整 Join 策略。
  • 动态优化倾斜的 Join
    本文接着介绍其中的最后一个动态优化倾斜的 Join

三、动态优化倾斜的 Join

1,策略介绍

(1)在进行 Join 操作的时候,如果数据在多个分区之间分布不均匀,很容易产生数据倾斜,如果数据倾斜比较严重会显著降低计算性能。
  • Spark 3.0 版本之前,如果在 join 的时候遇到了严重的数据倾斜,是需要我们自己对数据进行切分处理,提高计算效率的。

(2)Spark 3.0 有了动态优化倾斜的 Join 这个机制之后就很方便了。该机制会从 Shuffle 文件统计信息中自动检测到这种倾斜,然后它会将倾斜的分区做进一步切分,切分成更小的子分区,这些子分区会连接到对应的分区进行关联。

2,效果演示

(1)假设有两个表 t1 t2,其中表 t1 中的 P0 分区里面的数据量明显大于其他分区,默认的执行情况如下图所示:
  • t1 表中 P0 分区的数据比 p1p2p3 这几个分区的数据大很多,可以认为 t1 表中的数据出现了倾斜。
  • t1 t2 表中 p1p2p3 这几个分区在 join 的时候基本上是不会出现数据倾斜的,因为这些分区的数据相对适中。但是 P0 分区在进行join的时候就会出现数据倾斜了,这样会导致 join 的时间过长。

(2)动态优化倾斜的 Join 机制会把 P0 分区切分成两个子分区 P0-1 P0-2,并将每个子分区关联到表 t2 的对应分区 P0,如下图所示:
  • t2 表中的 P0 分区会复制出来两份相同的数据,和 t1 表中切分出来的 P0 分区的数据进行 Join 关联。
  • 这样相当于就把 t1 表中倾斜的分区拆分打散了,最终在 join 的时候就不会产生数据倾斜了。

(3)如果没有这个优化,将有 4 个任务运行 Join 操作,其中 P0 分区对应的任务将消耗很长时间。优化之后,会有 5 个任务运行 Join 操作,每个任务消耗的时间大致相同,这样就可以获得最优的执行性能了。

3,核心参数

(1)针对动态优化倾斜的 Join 策略的核心参数主要包括下面这 3 个:
核心参数 默认值 解释
spark.sql.adaptive.skewJoin.enabled true 是否开启 AQE 机制中的动态优化倾斜的 Join 机制
spark.sql.adaptive.skewJoin.skewedPartitionFactor 5 数据倾斜判断因子,必须同时满足(数据倾斜判断阈值)
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 268435456b 256M 数据倾斜判断阈值,必须同时满足(数据倾斜判断因子)

(2)如果 Shuffle 中的一个分区的大小大于 skewedPartitionFactor 这个因子乘以 Shuffle 分区中位数的值,并且这个分区也大于 skewedPartitionThresholdInBytes 这个参数的值,则认为这个分区是倾斜的。
提示:理想情况下,skewedPartitionThresholdInBytes 参数的值应该大于 advisoryPartitionSizeInBytes 参数的值。因为后期在切分这个倾斜的分区时会依据 advisoryPartitionSizeInBytes 参数的值进行切分,如果 skewedPartitionThresholdInBytes 参数的值小于 advisoryPartitionSizeInBytes 的值,那就无法切分了。

(3)通过下面这个图,可以更加清晰的理解如何判断数据倾斜:

附:案例演示

1,准备测试数据

(1)首先我们编译要给用于生成测试数据的 Java 类,代码如下:
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Random;

public class GenerateJSONData {
    public static void main(String[] args) throws Exception{
        //动态优化倾斜的Join-生成测试数据
        generateSkewJoin();
    }
    private static void generateSkewJoin() throws IOException {
        //表t1
        String fileName = "D:\\temp\\spark_json_skew_t1.dat";
        System.out.println("start: 开始生成文件->" + fileName);
        BufferedWriter bfw = new BufferedWriter(new FileWriter(fileName));
        int num = 0;
        while (num < 30000000) {
            bfw.write("{\"id\":\"100" + num + "\",\"score\":"
                    + (45 + new Random().nextInt(50)) + "}");
            bfw.newLine();
            num++;
            if (num % 10000 == 0) {
                bfw.flush();
            }
        }
        bfw.flush();

        //表t2
        fileName = "D:\\temp\\spark_json_skew_t2.dat";
        System.out.println("start: 开始生成文件->" + fileName);
        bfw = new BufferedWriter(new FileWriter(fileName));
        num = 0;
        while (num < 10000) {
            bfw.write("{\"id\":\"100" + num + "\",\"name\":\"zs" + num + "\",\"city\":\"bj\"}");
            bfw.newLine();
            num++;
            if (num % 10000 == 0) {
                bfw.flush();
            }
        }

        while (num < 30000000) {
            bfw.write("{\"id\":\"100\",\"name\":\"zs" + num + "\",\"city\":\"sh\"}");
            bfw.newLine();
            num++;
            if (num % 10000 == 0) {
                bfw.flush();
            }
        }
        bfw.flush();
        bfw.close();
    }
}


(2)然后执行该程序后会在指定目录生成两个数据文件,其中 spark_json_skew_t1.dat 大约 933Mspark_json_skew_t2.dat 大约 1.27G,文件里面为 Json 格式的测试数据:
 

2,测试未开启动态优化倾斜的 Join 功能

(1)为了进行对比试验,我们先开发一个未开启动态优化倾斜的 Join 功能的程序,代码如下:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object AQESkewJoinScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")
    //获取SparkSession,为了操作SparkSQL
    val sparkSession = SparkSession
      .builder()
      .appName("AQESkewJoinScala")
      .config(conf)
      //禁用AQE机制
      .config("spark.sql.adaptive.enabled","false")
      //禁用动态优化倾斜的Join(其实只要禁用了AQE机制,这个参数就不用设置了)
      .config("spark.sql.adaptive.skewJoin.enabled","false")
      .getOrCreate()

    //读取Json格式数据,获取DataFrame
    val jsonDf1 = sparkSession.read.json("D:\\temp\\spark_json_skew_t1.dat")
    val jsonDf2 = sparkSession.read.json("D:\\temp\\spark_json_skew_t2.dat")
    //创建临时表
    jsonDf1.createOrReplaceTempView("t1")
    jsonDf2.createOrReplaceTempView("t2")
    //执行SQL语句
    val sql =
      """
        |select
        |   t1.id,
        |   t2.name,
        |   t1.score
        |from t1 join t2
        |   on t1.id = t2.id
        |""".stripMargin
    sparkSession.sql(sql).write.format("json")
      .save("D:\\temp\\skew_join_"+System.currentTimeMillis())

    while (true){
      ;
    }
  }
}

(2)运行程序,然后使用浏览器访问 http://localhost:4040 打开任务界面中查看任务执行情况。可以看到 Spark SQL 中的 Join 功能在运行的时候产生了 200 个任务,因为默认 Shuffle 分区数量是 200,对应产生 200 个任务没有问题。

(3)是仔细查看这 200 个任务,发现其中有一个任务处理的数据量非常大,在 153M 左右。其他的任务处理的数据量很小,都在 1.5M 左右。从这里可以看出来这个 Join 操作在执行的时候出现了数据倾斜。

(4)针对这个任务而言,想要提高计算效率,需要把这个倾斜的分区中的数据进行拆分,拆分成多个子任务去执行,这样就可以了。这个问题正好是动态优化倾斜的 Join 这个功能可以解决的。

3,测试开启动态优化倾斜的 Join 功能

(1)我们修改下代码,验证一下动态优化倾斜的 Join 这个功能:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object AQESkewJoinScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")
    //获取SparkSession,为了操作SparkSQL
    val sparkSession = SparkSession
      .builder()
      .appName("AQESkewJoinScala")
      .config(conf)
      //禁用AQE机制
      //.config("spark.sql.adaptive.enabled","false")
      //禁用动态优化倾斜的Join(其实只要禁用了AQE机制,这个参数就不用设置了)
      //.config("spark.sql.adaptive.skewJoin.enabled","false")
      //注意:在验证动态优化倾斜的Join这个功能的时候,最好先把自适应调整Shuffle分区数量这个功能禁用,
      //避免影响结果
      .config("spark.sql.adaptive.coalescePartitions.enabled", "false")
      .getOrCreate()

    //读取Json格式数据,获取DataFrame
    val jsonDf1 = sparkSession.read.json("D:\\temp\\spark_json_skew_t1.dat")
    val jsonDf2 = sparkSession.read.json("D:\\temp\\spark_json_skew_t2.dat")
    //创建临时表
    jsonDf1.createOrReplaceTempView("t1")
    jsonDf2.createOrReplaceTempView("t2")
    //执行SQL语句
    val sql =
      """
        |select
        |   t1.id,
        |   t2.name,
        |   t1.score
        |from t1 join t2
        |   on t1.id = t2.id
        |""".stripMargin
    sparkSession.sql(sql).write.format("json")
      .save("D:\\temp\\skew_join_"+System.currentTimeMillis())

    while (true){
      ;
    }
  }
}

(2)重新执行程序,到任务界面中查看效果。在 Stage 界面中发现还是 200 个任务,任务数量没有增加,也就意味着动态优化倾斜的 Join 没有真正生效。

(3)进入这个 Stage 查看详细信息。发现之前这个 153M 的分区的数据依然存在,没有被分开处理。

(4)为什么动态优化倾斜的 Join 这个功能没有生效呢?我们回头再看一下这个图表中的这 2 个参数:

(5)如果我们开启了动态优化倾斜的 Join 这个功能,并且确实有数据倾斜了,但是没有真正触发执行,那说明这个倾斜的数据没有满足这 2 个参数的指标。

(6)首先看一下第一个条件:skewedPartitionFactor 等于 5分区大小的中位数可以到 SQL 任务的执行流程中查看,这个分区大小的中位数是一个预估值。到任务界面中,点击 SQL 模块查看:
  • 从图中可以看出来分区大小的中位数就是 719 字节。
  • 最终 5 * 719B = 3595B(约等于 0.003M)。
  • 目前这个倾斜的分区是 153M,大于 0.003M,第一个条件满足。其实按照这个条件的话,目前所有分区都是满足的。

(7)接下来看第二个条件:
  • skewedPartitionThresholdInBytes 这个参数默认是 256M,目前我们这个任务中倾斜的分区大小是 153M,不大于 256M,所以第二个条件不满足。
  • 最终 Spark 任务不认为这个分区是倾斜的,所以没有触发动态优化倾斜的 Join 策略的执行。

(8)接下来我们来修改一下代码,让这个 153M 的分区同时满足这两个条件:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object AQESkewJoinScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")
    //获取SparkSession,为了操作SparkSQL
    val sparkSession = SparkSession
      .builder()
      .appName("AQESkewJoinScala")
      .config(conf)
      //禁用AQE机制
      //.config("spark.sql.adaptive.enabled","false")
      //禁用动态优化倾斜的Join(其实只要禁用了AQE机制,这个参数就不用设置了)
      //.config("spark.sql.adaptive.skewJoin.enabled","false")
      //注意:在验证动态优化倾斜的Join这个功能的时候,最好先把自适应调整Shuffle分区数量这个功能禁用,
      //避免影响结果
      .config("spark.sql.adaptive.coalescePartitions.enabled", "false")
      //某个分区 > skewedPartitionFactor * 中位数
      .config("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
      //某个分区 > 100M
      .config("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "100mb")
      //如果这里指定的分区大小超过了任务中倾斜的分区的大小,这样就无法触发动态优化倾斜的Join这个功能了
      //建议这里设置的分区大小最大也不能超过skewedPartitionThresholdInBytes的值(100M)
      .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "64mb")
      .getOrCreate()

    //读取Json格式数据,获取DataFrame
    val jsonDf1 = sparkSession.read.json("D:\\temp\\spark_json_skew_t1.dat")
    val jsonDf2 = sparkSession.read.json("D:\\temp\\spark_json_skew_t2.dat")
    //创建临时表
    jsonDf1.createOrReplaceTempView("t1")
    jsonDf2.createOrReplaceTempView("t2")
    //执行SQL语句
    val sql =
      """
        |select
        |   t1.id,
        |   t2.name,
        |   t1.score
        |from t1 join t2
        |   on t1.id = t2.id
        |""".stripMargin
    sparkSession.sql(sql).write.format("json")
      .save("D:\\temp\\skew_join_"+System.currentTimeMillis())

    while (true){
      ;
    }
  }
}

(9)重新执行程序,到任务界面中查看效果。此时到 Stage 界面中发现产生了 202 个任务。

(10)点击查看这个 Stage 的详细信息。相当于把这个 153M 的分区,大致按照 64M,切分成了 3 份:61M60M 34M
  • 之前是 1 个任务处理这 153M 的数据,现在是有 3 个任务一起来处理这 153M 的数据,这样就解决了数据倾斜的问题,提高了任务的计算效率。
提示:
  • 并不是完全按照 64M 来切分,会大致按照 64M 来切分。
  • 如果感觉这样拆分还是有点大,则可以对应的调整 advisoryPartitionSizeInBytes 参数的值,这个参数的值越小,拆分出来的分区就越多,但是太多了也不好,那样就是很多小任务了。
评论

全部评论(0)

回到顶部