在上一篇文章 中,我们了解了 SparkSQL 查询的基本执行过程,并了解到 SQLContext 的内部类 QueryExecution 包含了整个执行过程的每一个执行步骤。
在这篇文章中,我将开始讲解 SQL 语句如何通过 Parser 转变为 Unresolved Logical Plan。
DDLParser 我们回到 SQLContext#parseSql 方法:
1 2 3 4 5 6 7 @transient protected [sql] val  ddlParser = new  DDLParser (sqlParser.parse(_))@transient protected [sql] val  sqlParser = new  SparkSQLParser (getSQLDialect().parse(_))protected [sql] def  parseSql String ): LogicalPlan  = ddlParser.parse(sql, false )
可以看到,parseSql 方法调用了 ddlParser 的 parse 方法。ddlParser 在初始化时传入了 sqlParser.parse 方法作为参数,而 sqlParser 在初始化时也传入了一个 SQL 方言的 parse 方法作为参数。这三个 parse 之间很有可能是一个 fallback 的关系。那我们先来看看 DDLParser:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private [sql] class  DDLParser (parseQuery: String  => LogicalPlan  )  extends  AbstractSparkSQLParser  with  DataTypeParser  with  Logging  {   def  parse String , exceptionOnError: Boolean ): LogicalPlan  = {     try  {              parse(input)     } catch  {       case  ddlException: DDLException  => throw  ddlException        	         case  _ if  !exceptionOnError => parseQuery(input)       case  x: Throwable  => throw  x     }   }       } 
先不急着往下看,因为这里调用了 AbstractSparkSQLParser 的 parse 方法。我们先看看 AbstractSparkSQLParser:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 private [sql] abstract  class  AbstractSparkSQLParser   extends  StandardTokenParsers  with  PackratParsers  {   def  parse String ): LogicalPlan  = {          initLexical 	     phrase(start)(new  lexical.Scanner (input)) match  {       case  Success (plan, _) => plan       case  failureOrError => sys.error(failureOrError.toString)     }   }      protected  lazy  val  initLexical: Unit  = lexical.initialize(reservedWords)   protected  case  class  Keyword (str: String  )      def  normalize String  = lexical.normalizeKeyword(str)     def  parser Parser [String ] = normalize   }   protected  implicit  def  asParser Keyword ): Parser [String ] = k.parser      protected  lazy  val  reservedWords: Seq [String ] =     this .getClass       .getMethods       .filter(_.getReturnType == classOf[Keyword ])       .map(_.invoke(this ).asInstanceOf[Keyword ].normalize)         override  val  lexical = new  SqlLexical    protected  def  start Parser [LogicalPlan ]      protected  lazy  val  wholeInput: Parser [String ] = new  Parser [String ] {     def  apply Input ): ParseResult [String ] =       Success (in.source.toString, in.drop(in.source.length()))   }      protected  lazy  val  restInput: Parser [String ] = new  Parser [String ] {     def  apply Input ): ParseResult [String ] =       Success (         in.source.subSequence(in.offset, in.source.length()).toString,         in.drop(in.source.length()))   } } 
我们看到,真正启动 parse 过程的实际上是如下代码块:
1 2 3 4 phrase(start)(new  lexical.Scanner (input)) match  {   case  Success (plan, _) => plan   case  failureOrError => sys.error(failureOrError.toString) } 
这里调用的 phrase 方法实际上来自于 AbstractSparkSQLParser 的父类 PackratParsers。PackratParsers 和 StandardTokenParsers 实际上都是 Scala 自带的类。它们的功能较为复杂,而且 SparkSQL 本身的作用原理关系并不是很大,我在这里就简单讲述一下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 override  def  phrase T ](p: Parser [T ]) = {  val  q = super .phrase(p)   new  PackratParser [T ] {     def  apply Input ) = in match  {       case  in: PackratReader [_] => q(in)       case  in => q(new  PackratReader (in))     }   } } 
可以看到,PackratParsers#phrase 方法接受一个 Parser 作为参数,并以其为参数调用了其父类 Parsers 的 phrase 方法,该方法同样返回一个 Parser。而后,PackratParsers#phrase 返回了一个 PackratParser,由 AbstractSparkSQLParser 调用这个对象的 apply 方法传入 SQL 语句。
我们回到 DDLParser:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private [sql] class  DDLParser (parseQuery: String  => LogicalPlan  )  extends  AbstractSparkSQLParser  with  DataTypeParser  with  Logging  {   def  parse String , exceptionOnError: Boolean ): LogicalPlan  = {        }      protected  val  CREATE  = Keyword ("CREATE" )   protected  val  TEMPORARY  = Keyword ("TEMPORARY" )   protected  val  TABLE  = Keyword ("TABLE" )   protected  val  IF  = Keyword ("IF" )   protected  val  NOT  = Keyword ("NOT" )   protected  val  EXISTS  = Keyword ("EXISTS" )   protected  val  USING  = Keyword ("USING" )   protected  val  OPTIONS  = Keyword ("OPTIONS" )   protected  val  DESCRIBE  = Keyword ("DESCRIBE" )   protected  val  EXTENDED  = Keyword ("EXTENDED" )   protected  val  AS  = Keyword ("AS" )   protected  val  COMMENT  = Keyword ("COMMENT" )   protected  val  REFRESH  = Keyword ("REFRESH" )   protected  lazy  val  ddl: Parser [LogicalPlan ] = createTable | describeTable | refreshTable   protected  def  start Parser [LogicalPlan ] = ddl          } 
在接下来的代码中,AbstractSparkSQLParser 实现了三个 parser:createTable 、 describeTable 和 refreshTable,并将其重载为 AbstractSparkSQLParser#start 变量,由此 DDLParser 改变了 AbstractSparkSQLParser#start 的功能。
上述的这些 Keyword 全都是 Spark 所支持的 DLL keyword,没有包含 SQL 的保留字。不难想象 DDLParser 仅用于解析 DDL 语句,当遇到 SQL 语句时,解析器将 fallback 到实例化 DDLTask 时传入的 parseQuery 函数,而这个函数正是 SparkSQLParser#parse 函数。
通过查看 SparkSQLParser 的源代码,可以有如下发现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private [sql] class  SparkSQLParser (fallback: String  => LogicalPlan  ) extends  AbstractSparkSQLParser     	   protected  val  AS  = Keyword ("AS" )   protected  val  CACHE  = Keyword ("CACHE" )   protected  val  CLEAR  = Keyword ("CLEAR" )   protected  val  IN  = Keyword ("IN" )   protected  val  LAZY  = Keyword ("LAZY" )   protected  val  SET  = Keyword ("SET" )   protected  val  SHOW  = Keyword ("SHOW" )   protected  val  TABLE  = Keyword ("TABLE" )   protected  val  TABLES  = Keyword ("TABLES" )   protected  val  UNCACHE  = Keyword ("UNCACHE" )          } 
从注释上看,SparkSQLParser 用于解析所有 SparkSQL 所支持的 SQL 方言所共有的关键字。当该解析器失败时,将会继续 fallback 到当时在 SQLContext 传入的 getSQLDialect().parse(_),使用某个特定的 SQL 方言进行解析。
总结 个人认为大多数人应该不会太在意 Parser 的原理,毕竟没什么人会需要去修改 SparkSQL 语句的解析逻辑,因此这一篇文章只能算是抛砖引玉,真正的解析逻辑还有待你们自己去发掘。在下一篇文章中我会先为大家讲解一下 LogicalPlan 的数据结构,敬请期待。