Spark Catalyst 进阶:Join
在之前的文章中,我们已经了解了 SparkSQL 把 SQL 语句变为 SparkJob 的过程。这个过程我们只是做了一个 Overview,具体不同的语句会变为怎样的 Job 我们并未一一列举。实际上列举起来是一件相当大工程的事。
在那么多的 SQL 操作中,有那么一个操作十分常用,但又十分耗时,那就是 Join 操作。在这篇文章里,我们将深入探讨 SparkSQL 会对不同的 Join 做出怎样的操作。
什么是 Join ?
在 SQL 中,Join 用于根据两个或多个表中的列之间的关系,从这些表中查询数据。表达 Join 的方式有两种:
1 | SELECT Persons.LastName, Persons.FirstName, Orders.OrderNo |
实际上,第一种方式更像是 SQL 的语法糖,理论上而言我们更偏向后一种写法。这种使用关键字 JOIN
的规范写法使用 ON
关键字表明了 Join 的条件,同时在 JOIN
前面加上了一个 INNER
来表明要执行的 Join 的类型。SparkSQL 支持的 SQL 操作有以下几种:
Join 类型 | 效果 |
---|---|
Inner Join | 使用比较运算符根据每个表共有的列的值匹配两个表中的行 |
Left Semi Join | 对于左表的每个键值,在右表中找到第一个匹配的键值便返回 |
Left Outer Join | 左向外联接的结果集包括 LEFT OUTER 子句中指定的左表的所有行,而不仅仅是联接列所匹配的行。如果左表的某行在右表中没有匹配行,则在相关联的结果集行中右表的所有选择列表列均为空值 |
Right Outer Join | 右向外联接是左向外联接的反向联接。将返回右表的所有行。如果右表的某行在左表中没有匹配行,则将为左表返回空值 |
Full Outer Join | 完整外部联接返回左表和右表中的所有行。当某行在另一个表中没有匹配行时,则另一个表的选择列表列包含空值。如果表之间有匹配行,则整个结果集行包含基表的数据值 |
接下来我们就开始看看 SparkSQL 会怎么处理这些 JOIN 语句。
Parser
首先 JOIN 语句要变成 Logical Plan 就需要先经过 Parser。根据我们之前学习过的内容来判断,JOIN 语句相关的解析规则在 SqlParser
类中:
1 | class SqlParser extends AbstractSparkSQLParser with DataTypeParser { |
由此,输入到 SparkSQL 中的 SQL 语句与 Join 类型的关系可以总结如下:
Join 类型 | SQL 语句 |
---|---|
Inner Join | SELECT ... FROM table1, table2[, ...] ... SELECT ... FROM ... JOIN ... [ON ...] |
Left Semi Join | SELECT ... FROM ... LEFT SEMI JOIN ... [ON ...] |
Left Outer Join | SELECT ... FROM ... LEFT [OUTER] JOIN ... [ON ...] |
Right Outer Join | SELECT ... FROM ... RIGHT [OUTER] JOIN ... [ON ...] |
Full Outer Join | SELECT ... FROM ... FULL [OUTER] JOIN ... [ON ...] |
接下来我们来看一下表示 Logical Plan 的 Join
类:
1 | case class Join( |
Join 的 Logical Plan 本身只有一个类,显得十分简单。
Analyzer
在通过 Parser 得到 Unresolved Logical Plan 以后,下一步就轮到 Analyzer 了。经过之前的学习,我们知道 Analyzer 所应用的全部规则都位于 Analyzer.scala
中:
1 | class Analyzer( |
看起来,Analyzer 对 Join 树做的操作仅在于解决一些很奇怪的属性冲突。这种问题属于少数派,相信大多数时候 SparkSQL 都不会进入这个分支。
Optimizer
接下来我们来看一下 Optimizer 是否有与 Join 相关的优化逻辑:
1 | // Join 首先出现在了这个 Rule 中 |
由此可见,Optimizer 对 Join 操作做出的优化,在于将 SELECT 以及 ON 所包含的属性考虑进去后,将左右子树中不需要的属性先删去再 Join,以此来优化 Join 的性能。
至此,Logical Plan 的处理过程就全部完成了。接下来就是重中之重了。
Planner
我们知道,Planner 将 Optimized Logical Plan 变为 Physical Plan 的规则全都位于 SparkStrategies
类中,那我们直接看吧:
1 | private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { |
通过阅读上述代码,我们找到了如下几个与 JOIN 有关的 SparkPlan:
- `BroadcastLeftSemiJoinHash`:Left Semi Join,ON 中存在相等条件,右子树小于阈值(默认 10MB)
- `LeftSemiJoinHash`:Left Semi Join,ON 中存在相等条件
- `LeftSemiJoinBNL`:Left Semi Join
- `BroadcastHashJoin`:Inner Join,ON 里有相等条件,左子树或右子树小于阈值(默认 10MB)。以较小的一侧为 BuildSide
- `SortMergeJoin`:Inner Join,ON 里有相等条件,sortMergeJoin 设置被开启
- `ShuffledHashJoin`:Inner Join,ON 里有相等条件。以较小的一侧为 buildSide。
- `HashOuterJoin`:ON 里有相等条件
- `CartesianProduct`:Inner Join,有 ON 语句
- `CartesianProduct`:没有 ON 语句
- `BroadcastNestedLoopJoin`:剩下的都是它
足足 10 种用于 Join 的 Physical Plan。看来 SparkSQL 也知道这是最关键的操作。接下来我们逐个解析这些 Plan。
Physical Plan
BroadcastLeftSemiJoinHash
准入条件:Left Semi Join,ON 中存在相等条件,右子树小于阈值(默认 10MB)
1 | case class BroadcastLeftSemiJoinHash( |
好,在看之前我们先看看 HashJoin
:
1 | trait HashJoin { |
好,我们再回到 BroadcastLeftSemiJoinHash
:
1 | case class BroadcastLeftSemiJoinHash( |
首先,在实例化结果 RDD 的时候,右子树的结果就已经计算完毕并被收集回来,将右子树的 Row 变为 key 并放入 HashSet 再广播出去的动作将由 Master 独自完成。在结果 RDD 的 collect
或其他方法被调用的时候,左子树的每个 Partition 同样会将自己的 Row 变为 key,并与之前广播的 HashSet 中的元素进行比对,返回 key 存在于 HashSet 中的记录。
RDD 的计算本该是 lazy 的。诚然,这里左子树的计算确实是 lazy 的,但右子树不是,右子树在 RDD 实例化的时候就已经计算完毕了,因此该方法不太适用于较大的右子树。不过,能产生这种 SparkPlan 本来就要求 LeftSemiJoin 操作右子树的 Statistics 值小于一定的阈值,因此这样做还是合理的。
LeftSemiJoinHash
准入条件:Left Semi Join,ON 中存在相等条件
1 | case class LeftSemiJoinHash( |
可见,其核心算法本身和 BroadcastLeftSemiJoinHash
并无不同,但却使用了 zipPartitions
方法来计算两个 RDD 的 Join 结果。如果要确保结果完全正确,就需要两个 RDD 的 Partition 数相同,同时在 key 上有着相同值的 Row 必然处于 index 相同的 Partition 内。我暂时无法理解 SparkSQL 要如何保证这两个条件同时满足,只能先放一放了。
LeftSemiJoinBN
准入条件:Left Semi Join
1 | case class LeftSemiJoinBNL(streamed: SparkPlan, broadcast: SparkPlan, condition: Option[Expression]) |
没什么特别,相当好理解。
BroadcastHashJoin
准入条件:Inner Join,ON 里有相等条件,左子树或右子树小于阈值(默认 10MB),以较小的一侧为 BuildSide
1 | case class BroadcastHashJoin( |
我们看到,在最后 BroadcastHashJoin
调用了父类 HashJoin
的 hashJoin
方法。我们来看看那个方法:
1 | trait HashJoin { |
嗯,这背后确实是个标准的 Hash Join 算法,但我必须得说,这写得实在是太巧妙了。
BroadcastHashJoin
实际上和 BroadcastLeftSemiJoinHash
很像,但后者的 buildSide 结果的收集是在 doExecute
被调用时进行,而前者在实例化时就已经以一个异步计算的形式开始了。考虑到 SparkSQL 的各种 lazy 变量,实际上前者的计算的启动时机比后者要早很多。前者在 planner.plan
的时候就已经开始了,而后者则要等到 QueryExecution#toRDD
。
SortMergeJoin
准入条件:Inner Join,ON 里有相等条件,sortMergeJoin 设置被开启
1 | case class SortMergeJoin( |
虽然算法不相同,但迭代器的设计思想上,SortMergeJoin
和 BroadcastHashJoin
还是很像的,只是前者的迭代器在 next
方法里调用了 hasNext
,这样的设计更为安全,而后者如果在 next
之前没有调用过 hasNext
则会直接出错。
ShuffledHashJoin
准入条件:Inner Join,ON 里有相等条件。以较小的一侧为 buildSide。
1 | case class ShuffledHashJoin( |
好像没什么需要说的,十分直观。
HashOuterJoin
准入条件:ON 里有相等条件
1 |
|
看起来有点费劲,写得很是不面向对象,但总体来说并没有什么特别深奥的地方,慢慢看还是可以看得懂的。
CartesianProduct
准入条件:Inner Join,有 ON 语句;没有 ON 语句
1 | case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNode { |
其实就是 RDD 的 cartesian
了。
BroadcastNestedLoopJoin
准入条件:剩下的所有 Join。以较小一侧为 buildSide。
1 | case class BroadcastNestedLoopJoin( |
也算是比较直观啦,并没有什么特别神奇的东西。至此,我们就探索完 SparkSQL 为 JOIN 操作设计的 9 种 Physical Plan 了,相信在这个操作上对 SparkSQL 知根知底为以后的工作也能带来莫大的好处。
Spark Catalyst 进阶:Join