返回 导航

Spark

hangge.com

Spark 3.x新特性 - 自适应查询执行详解2(动态调整Join策略)

作者:hangge | 2024-08-07 08:41
    自适应查询执行(Adaptive Query Execution),简称为 AQE。它是对 Spark 执行计划的优化,它可以基于任务运行时统计的数据指标动态修改 Spark 的执行计划。自适应查询执行主要带来了下面这 3 点优化功能:
  • 自适应调整 Shuffle 分区数量。
  • 动态调整 Join 策略。
  • 动态优化倾斜的 Join
    本文接着介绍其中的第二个动态调整 Join 策略。

二、动态调整 Join 策略

1,策略介绍

(1)Spark 中支持多种 Join 策略,其中 BroadcastHashJoin 的性能通常是最好的,但是前提是参加 Join 的其中一张表的数据能够存入内存。

(2)基于这个原因,当 Spark 评估参加 Join 的表的数据量小于广播大小的阈值时,会将 Join 策略调整为 BroadcastHashJoin。广播大小的阈值默认是 10M
  • 但是,很多情况都可能导致这种大小的评估出错。例如:Join 的时候 SQL 语句中存在过滤器。

(3)开启了自适应查询执行机制之后,可以在运行时根据最精确的数据指标重新规划 Join 策略,实现动态调整 Join 策略。
  • 以下图为例,我们可以看到,对表 t2 进行过滤之后的数据大小比预估值小得多,并且小到足以进行广播,因此在重新优化之后,之前静态生成的 SortMergeJoin 策略就会被转换为 BroadcastHashJoin 策略了。

2,参数说明

(1)针对动态调整 Join 策略的核心参数主要包括下面这 1 个:
核心参数 默认值 解释
spark.sql.adaptive.autoBroadcastJoinThreshold none 设置允许广播的表的最大值。设置为 -1 表示禁用。如果未设置会参考 spark.sql.autoBroadcastJoinThreshold 参数的值(10M)。

(2)spark.sql.adaptive.autoBroadcastJoinThreshold 这个参数没有默认值,通过这个参数可以控制允许广播的表的最大值。
  • 当两个表进行 join 的时候,如果一个表比较小,可以通过广播机制广播出去,这样就可以把本来是 reduce 端的 join,改为 map 端的 join,提高 join 效率。
  • 如果把这个参数的值设置为 -1,表示禁用自动广播策略。
  • 如果我们没有给这个参数设置值,则默认会使用 spark.sql.autoBroadcastJoinThreshold 参数的值,这个参数的值默认是 10M。那也就是说当一个表中的数据小于 10M 的时候在这里支持将这个表广播出去。

附:案例演示

1,准备测试数据

(1)首先我们编译要给用于生成测试数据的 Java 类,代码如下:
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Random;

public class GenerateJSONData {
    public static void main(String[] args) throws Exception{
        //动态调整Join策略-生成测试数据
        generateJoinData();
    }
    private static void generateJoinData() throws IOException {
        //表t1
        String fileName = "D:\\temp\\spark_json_t1.dat";
        System.out.println("start: 开始生成文件->"+fileName);
        BufferedWriter bfw = new BufferedWriter(new FileWriter(fileName));
        int num = 0;
        while(num<3000000){
            bfw.write("{\"id\":\"100"+num+"\",\"score\":"+(45+new Random().nextInt(50))+"}");
            bfw.newLine();
            num ++;
            if(num%10000==0){
                bfw.flush();
            }
        }
        bfw.flush();

        //表t2
        fileName = "D:\\temp\\spark_json_t2.dat";
        System.out.println("start: 开始生成文件->"+fileName);
        bfw = new BufferedWriter(new FileWriter(fileName));
        num = 0;
        while(num<10000){
            bfw.write("{\"id\":\"100"+num+"\",\"name\":\"zs"+num+"\",\"city\":\"bj\"}");
            bfw.newLine();
            num ++;
            if(num%10000==0){
                bfw.flush();
            }
        }

        while(num<3000000){
            bfw.write("{\"id\":\"100"+num+"\",\"name\":\"zs"+num+"\",\"city\":\"sh\"}");
            bfw.newLine();
            num ++;
            if(num%10000==0){
                bfw.flush();
            }
        }
        bfw.flush();
        bfw.close();
    }
}

(2)然后执行该程序后会在指定目录生成两个数据文件,其中 spark_json_t1.dat 大约 90Mspark_json_t2.dat 大约 146M,文件里面为 Json 格式的测试数据:

2,测试未开启动态调整 Join 策略

(1)为了对比实验,在这里我们先开发一个未开启动态调整 Join 策略的程序。
注意:因为针对动态调整 Join 策略这个功能没有单独提供参数进行控制,只要开启了自适应查询执行,当满足条件的时候,动态调整 Join 策略就会触发了。因此这里我们关闭自适应查询执行。
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object AQEJoinScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")
    //获取SparkSession,为了操作SparkSQL
    val sparkSession = SparkSession
      .builder()
      .appName("AQEJoinScala")
      .config(conf)
      .config("spark.sql.adaptive.enabled","false")
      .getOrCreate()

    //读取Json格式数据,获取DataFrame
    val jsonDf1 = sparkSession.read.json("D:\\temp\\spark_json_t1.dat")
    val jsonDf2 = sparkSession.read.json("D:\\temp\\spark_json_t2.dat")
    //创建临时表
    jsonDf1.createOrReplaceTempView("t1")
    jsonDf2.createOrReplaceTempView("t2")
    //执行SQL语句
    val sql =
      """
        |select
        |   t1.id,
        |   t2.name,
        |   t1.score
        |from t1 join t2
        |   on t1.id = t2.id
        |   and t2.city like 'bj'
        |""".stripMargin
    sparkSession.sql(sql).write.format("json")
      .save("D:\\temp\\join_"+System.currentTimeMillis())

    //让程序一直运行,这样本地的Spark任务界面就可以一直查看
    while (true){
      ;
    }
  }
}

(2)然后使用浏览器访问 http://localhost:4040 打开任务界面中查看任务执行情况:

(3)从下面这个图里面可以看出来:
  • 此时JOIN的时候使用的是 SortMergeJoin 策略。这个策略是任务一开始的时候就指定好的,因为前期任务无法真正知道过滤后的 t2 表有多大,原始的 t2 表有 146M,原始的 t1 表有 90M,所以不满足广播策略的条件。
  • 此外,在这里其实可以看到,真正过滤后的数据只有 139KB,所以 t2 这个表过滤后的大小是小于广播阈值的,正常情况下在 Join 的时候是应该广播出去的,这样才可以提高计算效率的。但是由于此时没有开启自适应查询执行策略,任务运行期间无法根据数据的实际大小修改执行计划。

3,测试开启动态调整 Join 策略

(1)接下来修改代码,开启动态调整 Join 策略,其实也就是开启自适应查询执行机制。
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object AQEJoinScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")
    //获取SparkSession,为了操作SparkSQL
    val sparkSession = SparkSession
      .builder()
      .appName("AQEJoinScala")
      .config(conf)
      //.config("spark.sql.adaptive.enabled","false")
      .getOrCreate()

    //读取Json格式数据,获取DataFrame
    val jsonDf1 = sparkSession.read.json("D:\\temp\\spark_json_t1.dat")
    val jsonDf2 = sparkSession.read.json("D:\\temp\\spark_json_t2.dat")
    //创建临时表
    jsonDf1.createOrReplaceTempView("t1")
    jsonDf2.createOrReplaceTempView("t2")
    //执行SQL语句
    val sql =
      """
        |select
        |   t1.id,
        |   t2.name,
        |   t1.score
        |from t1 join t2
        |   on t1.id = t2.id
        |   and t2.city like 'bj'
        |""".stripMargin
    sparkSession.sql(sql).write.format("json")
      .save("D:\\temp\\join_"+System.currentTimeMillis())

    //让程序一直运行,这样本地的Spark任务界面就可以一直查看
    while (true){
      ;
    }
  }
}

(2)执行代码,任务在刚开始运行的时候我们来查看这个执行计划,可以发现默认使用的还是 SortMergeJoin 策略,如下图所示:

(3)随着任务的进一步执行,当 Spark 获取到 t2 表过滤后的真实数据大小以后,会对之前生成的执行计划进行优化,改为使用 BroadcastHashJoin 策略。
  • 因为针对 t2 表,通过 like 过滤之后剩下的真实数据只有 139KB,这个数据大小是小于 10M 的,可以广播出去提高 Join 的效率。

(4)这些数据大小在 stage 界面中也可以看到:

(5)针对这两种情况的执行效果进行对比,如下图所示,开启了自适应查询执行机制之后,会在运行期间获取到 t2 表过滤后的真实数据,从而修改之前的执行策略。

4,禁用自动广播机制

(1)当我们把 spark.sql.adaptive.autoBroadcastJoinThreshold 参数设置为 -1 的时候,可以禁用 sparksql 中的自动广播机制,就算开启了自适应查询执行机制,也无法转换为 BroadcastHashJoin 策略。
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object AQEJoinScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")
    //获取SparkSession,为了操作SparkSQL
    val sparkSession = SparkSession
      .builder()
      .appName("AQEJoinScala")
      .config(conf)
      //.config("spark.sql.adaptive.enabled","false")
      //禁用自动广播机制
      .config("spark.sql.adaptive.autoBroadcastJoinThreshold", "-1")
      .getOrCreate()

    //读取Json格式数据,获取DataFrame
    val jsonDf1 = sparkSession.read.json("D:\\temp\\spark_json_t1.dat")
    val jsonDf2 = sparkSession.read.json("D:\\temp\\spark_json_t2.dat")
    //创建临时表
    jsonDf1.createOrReplaceTempView("t1")
    jsonDf2.createOrReplaceTempView("t2")
    //执行SQL语句
    val sql =
      """
        |select
        |   t1.id,
        |   t2.name,
        |   t1.score
        |from t1 join t2
        |   on t1.id = t2.id
        |   and t2.city like 'bj'
        |""".stripMargin
    sparkSession.sql(sql).write.format("json")
      .save("D:\\temp\\join_"+System.currentTimeMillis())

    //让程序一直运行,这样本地的Spark任务界面就可以一直查看
    while (true){
      ;
    }
  }
}

(2)执行代码,查看任务界面效果。此时发现就算是开启了自适应查询执行机制,依然还是使用的 SortMergeJoin 策略。
评论

全部评论(0)

回到顶部