Spark 3.x新特性 - 自适应查询执行详解2(动态调整Join策略)
作者:hangge | 2024-08-07 08:41
自适应查询执行(Adaptive Query Execution),简称为 AQE。它是对 Spark 执行计划的优化,它可以基于任务运行时统计的数据指标动态修改 Spark 的执行计划。自适应查询执行主要带来了下面这 3 点优化功能:
- 自适应调整 Shuffle 分区数量。
- 动态调整 Join 策略。
- 动态优化倾斜的 Join。
本文接着介绍其中的第二个动态调整 Join 策略。
二、动态调整 Join 策略
1,策略介绍
(1)Spark 中支持多种 Join 策略,其中 BroadcastHashJoin 的性能通常是最好的,但是前提是参加 Join 的其中一张表的数据能够存入内存。
(2)基于这个原因,当 Spark 评估参加 Join 的表的数据量小于广播大小的阈值时,会将 Join 策略调整为 BroadcastHashJoin。广播大小的阈值默认是 10M。
- 但是,很多情况都可能导致这种大小的评估出错。例如:Join 的时候 SQL 语句中存在过滤器。
(3)开启了自适应查询执行机制之后,可以在运行时根据最精确的数据指标重新规划 Join 策略,实现动态调整 Join 策略。
- 以下图为例,我们可以看到,对表 t2 进行过滤之后的数据大小比预估值小得多,并且小到足以进行广播,因此在重新优化之后,之前静态生成的 SortMergeJoin 策略就会被转换为 BroadcastHashJoin 策略了。
2,参数说明
(1)针对动态调整 Join 策略的核心参数主要包括下面这 1 个:
核心参数 | 默认值 | 解释 |
spark.sql.adaptive.autoBroadcastJoinThreshold | none | 设置允许广播的表的最大值。设置为 -1 表示禁用。如果未设置会参考 spark.sql.autoBroadcastJoinThreshold 参数的值(10M)。 |
(2)spark.sql.adaptive.autoBroadcastJoinThreshold 这个参数没有默认值,通过这个参数可以控制允许广播的表的最大值。
(2)然后执行该程序后会在指定目录生成两个数据文件,其中 spark_json_t1.dat 大约 90M,spark_json_t2.dat 大约 146M,文件里面为 Json 格式的测试数据:
(2)然后使用浏览器访问 http://localhost:4040 打开任务界面中查看任务执行情况:
(2)执行代码,任务在刚开始运行的时候我们来查看这个执行计划,可以发现默认使用的还是 SortMergeJoin 策略,如下图所示:
(2)执行代码,查看任务界面效果。此时发现就算是开启了自适应查询执行机制,依然还是使用的 SortMergeJoin 策略。
- 当两个表进行 join 的时候,如果一个表比较小,可以通过广播机制广播出去,这样就可以把本来是 reduce 端的 join,改为 map 端的 join,提高 join 效率。
- 如果把这个参数的值设置为 -1,表示禁用自动广播策略。
- 如果我们没有给这个参数设置值,则默认会使用 spark.sql.autoBroadcastJoinThreshold 参数的值,这个参数的值默认是 10M。那也就是说当一个表中的数据小于 10M 的时候在这里支持将这个表广播出去。
附:案例演示
1,准备测试数据
(1)首先我们编译要给用于生成测试数据的 Java 类,代码如下:
import java.io.BufferedWriter; import java.io.FileWriter; import java.io.IOException; import java.util.Random; public class GenerateJSONData { public static void main(String[] args) throws Exception{ //动态调整Join策略-生成测试数据 generateJoinData(); } private static void generateJoinData() throws IOException { //表t1 String fileName = "D:\\temp\\spark_json_t1.dat"; System.out.println("start: 开始生成文件->"+fileName); BufferedWriter bfw = new BufferedWriter(new FileWriter(fileName)); int num = 0; while(num<3000000){ bfw.write("{\"id\":\"100"+num+"\",\"score\":"+(45+new Random().nextInt(50))+"}"); bfw.newLine(); num ++; if(num%10000==0){ bfw.flush(); } } bfw.flush(); //表t2 fileName = "D:\\temp\\spark_json_t2.dat"; System.out.println("start: 开始生成文件->"+fileName); bfw = new BufferedWriter(new FileWriter(fileName)); num = 0; while(num<10000){ bfw.write("{\"id\":\"100"+num+"\",\"name\":\"zs"+num+"\",\"city\":\"bj\"}"); bfw.newLine(); num ++; if(num%10000==0){ bfw.flush(); } } while(num<3000000){ bfw.write("{\"id\":\"100"+num+"\",\"name\":\"zs"+num+"\",\"city\":\"sh\"}"); bfw.newLine(); num ++; if(num%10000==0){ bfw.flush(); } } bfw.flush(); bfw.close(); } }
(2)然后执行该程序后会在指定目录生成两个数据文件,其中 spark_json_t1.dat 大约 90M,spark_json_t2.dat 大约 146M,文件里面为 Json 格式的测试数据:
2,测试未开启动态调整 Join 策略
(1)为了对比实验,在这里我们先开发一个未开启动态调整 Join 策略的程序。
注意:因为针对动态调整 Join 策略这个功能没有单独提供参数进行控制,只要开启了自适应查询执行,当满足条件的时候,动态调整 Join 策略就会触发了。因此这里我们关闭自适应查询执行。
import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object AQEJoinScala { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") //获取SparkSession,为了操作SparkSQL val sparkSession = SparkSession .builder() .appName("AQEJoinScala") .config(conf) .config("spark.sql.adaptive.enabled","false") .getOrCreate() //读取Json格式数据,获取DataFrame val jsonDf1 = sparkSession.read.json("D:\\temp\\spark_json_t1.dat") val jsonDf2 = sparkSession.read.json("D:\\temp\\spark_json_t2.dat") //创建临时表 jsonDf1.createOrReplaceTempView("t1") jsonDf2.createOrReplaceTempView("t2") //执行SQL语句 val sql = """ |select | t1.id, | t2.name, | t1.score |from t1 join t2 | on t1.id = t2.id | and t2.city like 'bj' |""".stripMargin sparkSession.sql(sql).write.format("json") .save("D:\\temp\\join_"+System.currentTimeMillis()) //让程序一直运行,这样本地的Spark任务界面就可以一直查看 while (true){ ; } } }
(2)然后使用浏览器访问 http://localhost:4040 打开任务界面中查看任务执行情况:
(3)从下面这个图里面可以看出来:
- 此时JOIN的时候使用的是 SortMergeJoin 策略。这个策略是任务一开始的时候就指定好的,因为前期任务无法真正知道过滤后的 t2 表有多大,原始的 t2 表有 146M,原始的 t1 表有 90M,所以不满足广播策略的条件。
- 此外,在这里其实可以看到,真正过滤后的数据只有 139KB,所以 t2 这个表过滤后的大小是小于广播阈值的,正常情况下在 Join 的时候是应该广播出去的,这样才可以提高计算效率的。但是由于此时没有开启自适应查询执行策略,任务运行期间无法根据数据的实际大小修改执行计划。
3,测试开启动态调整 Join 策略
(1)接下来修改代码,开启动态调整 Join 策略,其实也就是开启自适应查询执行机制。
import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object AQEJoinScala { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") //获取SparkSession,为了操作SparkSQL val sparkSession = SparkSession .builder() .appName("AQEJoinScala") .config(conf) //.config("spark.sql.adaptive.enabled","false") .getOrCreate() //读取Json格式数据,获取DataFrame val jsonDf1 = sparkSession.read.json("D:\\temp\\spark_json_t1.dat") val jsonDf2 = sparkSession.read.json("D:\\temp\\spark_json_t2.dat") //创建临时表 jsonDf1.createOrReplaceTempView("t1") jsonDf2.createOrReplaceTempView("t2") //执行SQL语句 val sql = """ |select | t1.id, | t2.name, | t1.score |from t1 join t2 | on t1.id = t2.id | and t2.city like 'bj' |""".stripMargin sparkSession.sql(sql).write.format("json") .save("D:\\temp\\join_"+System.currentTimeMillis()) //让程序一直运行,这样本地的Spark任务界面就可以一直查看 while (true){ ; } } }
(2)执行代码,任务在刚开始运行的时候我们来查看这个执行计划,可以发现默认使用的还是 SortMergeJoin 策略,如下图所示:
(3)随着任务的进一步执行,当 Spark 获取到 t2 表过滤后的真实数据大小以后,会对之前生成的执行计划进行优化,改为使用 BroadcastHashJoin 策略。
- 因为针对 t2 表,通过 like 过滤之后剩下的真实数据只有 139KB,这个数据大小是小于 10M 的,可以广播出去提高 Join 的效率。
(4)这些数据大小在 stage 界面中也可以看到:
(5)针对这两种情况的执行效果进行对比,如下图所示,开启了自适应查询执行机制之后,会在运行期间获取到 t2 表过滤后的真实数据,从而修改之前的执行策略。
4,禁用自动广播机制
(1)当我们把 spark.sql.adaptive.autoBroadcastJoinThreshold 参数设置为 -1 的时候,可以禁用 sparksql 中的自动广播机制,就算开启了自适应查询执行机制,也无法转换为 BroadcastHashJoin 策略。
import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object AQEJoinScala { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") //获取SparkSession,为了操作SparkSQL val sparkSession = SparkSession .builder() .appName("AQEJoinScala") .config(conf) //.config("spark.sql.adaptive.enabled","false") //禁用自动广播机制 .config("spark.sql.adaptive.autoBroadcastJoinThreshold", "-1") .getOrCreate() //读取Json格式数据,获取DataFrame val jsonDf1 = sparkSession.read.json("D:\\temp\\spark_json_t1.dat") val jsonDf2 = sparkSession.read.json("D:\\temp\\spark_json_t2.dat") //创建临时表 jsonDf1.createOrReplaceTempView("t1") jsonDf2.createOrReplaceTempView("t2") //执行SQL语句 val sql = """ |select | t1.id, | t2.name, | t1.score |from t1 join t2 | on t1.id = t2.id | and t2.city like 'bj' |""".stripMargin sparkSession.sql(sql).write.format("json") .save("D:\\temp\\join_"+System.currentTimeMillis()) //让程序一直运行,这样本地的Spark任务界面就可以一直查看 while (true){ ; } } }
(2)执行代码,查看任务界面效果。此时发现就算是开启了自适应查询执行机制,依然还是使用的 SortMergeJoin 策略。
全部评论(0)