在上一篇文章 中,我们了解了 SparkSQL 查询的基本执行过程,并了解到 SQLContext
的内部类 QueryExecution
包含了整个执行过程的每一个执行步骤。
在这篇文章中,我将开始讲解 SQL 语句如何通过 Parser 转变为 Unresolved Logical Plan。
DDLParser 我们回到 SQLContext#parseSql
方法:
1 2 3 4 5 6 7 @transient protected [sql] val ddlParser = new DDLParser (sqlParser.parse(_))@transient protected [sql] val sqlParser = new SparkSQLParser (getSQLDialect().parse(_))protected [sql] def parseSql (sql: String ): LogicalPlan = ddlParser.parse(sql, false )
可以看到,parseSql
方法调用了 ddlParser
的 parse
方法。ddlParser
在初始化时传入了 sqlParser.parse
方法作为参数,而 sqlParser
在初始化时也传入了一个 SQL 方言的 parse
方法作为参数。这三个 parse
之间很有可能是一个 fallback
的关系。那我们先来看看 DDLParser
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private [sql] class DDLParser (parseQuery: String => LogicalPlan ) extends AbstractSparkSQLParser with DataTypeParser with Logging { def parse (input: String , exceptionOnError: Boolean ): LogicalPlan = { try { parse(input) } catch { case ddlException: DDLException => throw ddlException case _ if !exceptionOnError => parseQuery(input) case x: Throwable => throw x } } }
先不急着往下看,因为这里调用了 AbstractSparkSQLParser
的 parse
方法。我们先看看 AbstractSparkSQLParser
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 private [sql] abstract class AbstractSparkSQLParser extends StandardTokenParsers with PackratParsers { def parse (input: String ): LogicalPlan = { initLexical phrase(start)(new lexical.Scanner (input)) match { case Success (plan, _) => plan case failureOrError => sys.error(failureOrError.toString) } } protected lazy val initLexical: Unit = lexical.initialize(reservedWords) protected case class Keyword (str: String ) { def normalize : String = lexical.normalizeKeyword(str) def parser : Parser [String ] = normalize } protected implicit def asParser (k: Keyword ): Parser [String ] = k.parser protected lazy val reservedWords: Seq [String ] = this .getClass .getMethods .filter(_.getReturnType == classOf[Keyword ]) .map(_.invoke(this ).asInstanceOf[Keyword ].normalize) override val lexical = new SqlLexical protected def start : Parser [LogicalPlan ] protected lazy val wholeInput: Parser [String ] = new Parser [String ] { def apply (in: Input ): ParseResult [String ] = Success (in.source.toString, in.drop(in.source.length())) } protected lazy val restInput: Parser [String ] = new Parser [String ] { def apply (in: Input ): ParseResult [String ] = Success ( in.source.subSequence(in.offset, in.source.length()).toString, in.drop(in.source.length())) } }
我们看到,真正启动 parse
过程的实际上是如下代码块:
1 2 3 4 phrase(start)(new lexical.Scanner (input)) match { case Success (plan, _) => plan case failureOrError => sys.error(failureOrError.toString) }
这里调用的 phrase
方法实际上来自于 AbstractSparkSQLParser
的父类 PackratParsers
。PackratParsers
和 StandardTokenParsers
实际上都是 Scala 自带的类。它们的功能较为复杂,而且 SparkSQL 本身的作用原理关系并不是很大,我在这里就简单讲述一下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 override def phrase [T ](p: Parser [T ]) = { val q = super .phrase(p) new PackratParser [T ] { def apply (in: Input ) = in match { case in: PackratReader [_] => q(in) case in => q(new PackratReader (in)) } } }
可以看到,PackratParsers#phrase
方法接受一个 Parser
作为参数,并以其为参数调用了其父类 Parsers
的 phrase
方法,该方法同样返回一个 Parser
。而后,PackratParsers#phrase
返回了一个 PackratParser
,由 AbstractSparkSQLParser
调用这个对象的 apply
方法传入 SQL 语句。
我们回到 DDLParser
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private [sql] class DDLParser (parseQuery: String => LogicalPlan ) extends AbstractSparkSQLParser with DataTypeParser with Logging { def parse (input: String , exceptionOnError: Boolean ): LogicalPlan = { } protected val CREATE = Keyword ("CREATE" ) protected val TEMPORARY = Keyword ("TEMPORARY" ) protected val TABLE = Keyword ("TABLE" ) protected val IF = Keyword ("IF" ) protected val NOT = Keyword ("NOT" ) protected val EXISTS = Keyword ("EXISTS" ) protected val USING = Keyword ("USING" ) protected val OPTIONS = Keyword ("OPTIONS" ) protected val DESCRIBE = Keyword ("DESCRIBE" ) protected val EXTENDED = Keyword ("EXTENDED" ) protected val AS = Keyword ("AS" ) protected val COMMENT = Keyword ("COMMENT" ) protected val REFRESH = Keyword ("REFRESH" ) protected lazy val ddl: Parser [LogicalPlan ] = createTable | describeTable | refreshTable protected def start : Parser [LogicalPlan ] = ddl }
在接下来的代码中,AbstractSparkSQLParser
实现了三个 parser:createTable
、 describeTable
和 refreshTable
,并将其重载为 AbstractSparkSQLParser#start
变量,由此 DDLParser
改变了 AbstractSparkSQLParser#start
的功能。
上述的这些 Keyword 全都是 Spark 所支持的 DLL keyword,没有包含 SQL 的保留字。不难想象 DDLParser
仅用于解析 DDL 语句,当遇到 SQL 语句时,解析器将 fallback 到实例化 DDLTask
时传入的 parseQuery
函数,而这个函数正是 SparkSQLParser#parse
函数。
通过查看 SparkSQLParser
的源代码,可以有如下发现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private [sql] class SparkSQLParser (fallback: String => LogicalPlan ) extends AbstractSparkSQLParser { protected val AS = Keyword ("AS" ) protected val CACHE = Keyword ("CACHE" ) protected val CLEAR = Keyword ("CLEAR" ) protected val IN = Keyword ("IN" ) protected val LAZY = Keyword ("LAZY" ) protected val SET = Keyword ("SET" ) protected val SHOW = Keyword ("SHOW" ) protected val TABLE = Keyword ("TABLE" ) protected val TABLES = Keyword ("TABLES" ) protected val UNCACHE = Keyword ("UNCACHE" ) }
从注释上看,SparkSQLParser
用于解析所有 SparkSQL 所支持的 SQL 方言所共有的关键字。当该解析器失败时,将会继续 fallback 到当时在 SQLContext
传入的 getSQLDialect().parse(_)
,使用某个特定的 SQL 方言进行解析。
总结 个人认为大多数人应该不会太在意 Parser 的原理,毕竟没什么人会需要去修改 SparkSQL 语句的解析逻辑,因此这一篇文章只能算是抛砖引玉,真正的解析逻辑还有待你们自己去发掘。在下一篇文章中我会先为大家讲解一下 LogicalPlan 的数据结构,敬请期待。