此文接上文 ,继续讲解 SparkSQL Hive ThriftServer 源码。
上文提到,主类 HiveThriftServer2
在启动后便会启动 ThriftCLIService
和 SparkSQLCLIService
,其中 ThriftCLIService
负责维护与客户端的连接并将客户端的请求转发至 SparkSQLCLIService
,由 SparkSQLCLIService
执行运算并把结果返回给 ThriftCLIService
,ThriftCLIService
再把结果以 ResultSet
的形式返回给客户端。两者之间的关系如下图所示:
但当下,我们并不清楚,两个 Service 之间以及 ThriftCLIService
与客户端之间是如何完成交互的。本文将先从 SparkSQLCLIService
开始,看看在这个方向上能不能找到点线索。
SparkSQLCLIService 咱直接开始看代码吧!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 private [hive] class SparkSQLCLIService (hiveContext: HiveContext ) extends CLIService with ReflectedCompositeService { override def init (hiveConf: HiveConf ) { setSuperField(this , "hiveConf" , hiveConf) val sparkSqlSessionManager = new SparkSQLSessionManager (hiveContext) setSuperField(this , "sessionManager" , sparkSqlSessionManager) addService(sparkSqlSessionManager) var sparkServiceUGI: UserGroupInformation = null if (ShimLoader .getHadoopShims.isSecurityEnabled) { try { HiveAuthFactory .loginFromKeytab(hiveConf) sparkServiceUGI = ShimLoader .getHadoopShims.getUGIForConf(hiveConf) HiveThriftServerShim .setServerUserName(sparkServiceUGI, this ) } catch { case e @ (_: IOException | _: LoginException ) => throw new ServiceException ("Unable to login to kerberos with given principal/keytab" , e) } } initCompositeService(hiveConf) } }
首先我们看到 SparkSQLCLISerivce 继承自 CLIService
,同时混入了 ReflectedCompositeService
特质。由此可见,CompositeService
应该也是 SparkSQLCLIService
的父类之一。对比于 CLIService
的 init
方法(其部分源代码已以行注释的形式在上述对应代码中给出),SparkSQLCLIService
的 init
方法可以说完全是在做一模一样的事情,不同点仅在于 CLIService
启动一个 SessionManager
,而 SparkSQLCLIService
启动了一个 SparkSQLSessionManager
。我觉得光从名字上都能判断出来,SparkSQLSessionManager
一定继承自 SessionManager
。
让我们继续一探究竟吧!
SparkSQLSessionManager 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 private [hive] class SparkSQLSessionManager (hiveContext: HiveContext ) extends SessionManager with ReflectedCompositeService { private lazy val sparkSqlOperationManager = new SparkSQLOperationManager (hiveContext) override def init (hiveConf: HiveConf ) { setSuperField(this , "hiveConf" , hiveConf) val backgroundPoolSize = hiveConf.getIntVar(ConfVars .HIVE_SERVER2_ASYNC_EXEC_THREADS ) setSuperField(this , "backgroundOperationPool" , Executors .newFixedThreadPool(backgroundPoolSize)) getAncestorField[Log ](this , 3 , "LOG" ).info( s"HiveServer2: Async execution pool size $backgroundPoolSize " ) setSuperField(this , "operationManager" , sparkSqlOperationManager) addService(sparkSqlOperationManager) initCompositeService(hiveConf) } override def openSession ( protocol: TProtocolVersion , username: String , passwd: String , sessionConf: java.util.Map [String , String ], withImpersonation: Boolean , delegationToken: String ): SessionHandle = { hiveContext.openSession() val sessionHandle = super .openSession( protocol, username, passwd, sessionConf, withImpersonation, delegationToken) val session = super .getSession(sessionHandle) HiveThriftServer2 .listener.onSessionCreated( session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername) sessionHandle } override def closeSession (sessionHandle: SessionHandle ) { HiveThriftServer2 .listener.onSessionClosed(sessionHandle.getSessionId.toString) super .closeSession(sessionHandle) sparkSqlOperationManager.sessionToActivePool -= sessionHandle hiveContext.detachSession() } }
果不其然,SparkSQLSessionManager
的 init
方法与 SessionManager
的 init
方法极为相似。从名字上看,Session Manager 当然是用来管理 Session 的了。SparkSQLSessionManager
的 openSession
和 closeSession
方法都有调用 SessionManager
的对应方法来管理 HiveSession
,同时还管理了 HiveContext
内部的 SQLSession
。简单的查看 HiveSession
和 SQLSession
的定义,可以得出结论,HiveSession
指的是 Hive ThriftServer 与 Client 之间的 Session,即通常意义上的网络 Session;而 SQLSession
指的是 SparkSQL 与 Hive ThriftServer 之间的 Session,但 SQLSession
实际存储的只是一系列与 SQL 查询有关的配置参数,和传统意义上的网络 Session 不同。
SparkSQLSessionManager 与 SessionManager 的不同点在于 SparkSQLSessionManager 启动了一个 SparkSQLOperationManager
,而 SessionManager 启动的是 OperationManager
。那么,其实也能猜到一些了。
SparkSQLOperationManager 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private [thriftserver] class SparkSQLOperationManager (hiveContext: HiveContext ) extends OperationManager with Logging { val handleToOperation = ReflectionUtils .getSuperField[JMap [OperationHandle , Operation ]](this , "handleToOperation" ) val sessionToActivePool = Map [SessionHandle , String ]() override def newExecuteStatementOperation ( parentSession: HiveSession , statement: String , confOverlay: JMap [String , String ], async: Boolean ): ExecuteStatementOperation = synchronized { val operation = new SparkExecuteStatementOperation (parentSession, statement, confOverlay)( hiveContext, sessionToActivePool) handleToOperation.put(operation.getHandle, operation) operation } }
简短,直白。很明显,newExecuteStatementOperation
方法会在客户端发送 JDBC 请求后被调用。方法创建了一个 SparkExecuteStatementOperation
,并将其进行缓存管理。实际上,SparkSQLOperationManager
只复写了 OperationManager
的 newExecuteStatementOperation
方法,除此之外 OperationManager
还有 newGetSchemasOperation
等其他方法。这些方法从命名上判断,都是用户在查询表的元数据时才会触发的操作,比如 newGetSchemasOperation
应该是会在用户试图查询某张表的模式的时候才会触发的操作。SparkSQL 之所以要重载 newExecuteStatementOperation
的原因是显然的:Execute 意味着执行,SparkSQL Hive ThriftServer 通过重载该方法,把用户通过 execQuery
发送的执行请求转发至 SparkSQL。
那就直接看看 SparkExecuteStatementOperation
到底干了什么吧(如果你已经猜到了,我并不会觉得意外 ;-) )。
SparkExecuteStatementOperation 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 private [hive] class SparkExecuteStatementOperation ( parentSession: HiveSession , statement: String , confOverlay: JMap [String , String ], runInBackground: Boolean = true )( hiveContext: HiveContext , sessionToActivePool: SMap [SessionHandle , String ] ) extends ExecuteStatementOperation (parentSession, statement, confOverlay, false ) with Logging { private var result: DataFrame = _ private var iter: Iterator [SparkRow ] = _ private var dataTypes: Array [DataType ] = _ def close (): Unit = { logDebug("CLOSING" ) } def getNextRowSet (order: FetchOrientation , maxRowsL: Long ): RowSet = { } def getResultSetSchema : TableSchema = { } def run (): Unit = { val statementId = UUID .randomUUID().toString logInfo(s"Running query '$statement '" ) setState(OperationState .RUNNING ) HiveThriftServer2 .listener.onStatementStart( statementId, parentSession.getSessionHandle.getSessionId.toString, statement, statementId, parentSession.getUsername) hiveContext.sparkContext.setJobGroup(statementId, statement) sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool => hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool" , pool) } try { result = hiveContext.sql(statement) logDebug(result.queryExecution.toString()) result.queryExecution.logical match { case SetCommand (Some ((SQLConf .THRIFTSERVER_POOL , Some (value))), _) => sessionToActivePool(parentSession.getSessionHandle) = value logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session." ) case _ => } HiveThriftServer2 .listener.onStatementParsed(statementId, result.queryExecution.toString()) iter = { val useIncrementalCollect = hiveContext.getConf("spark.sql.thriftServer.incrementalCollect" , "false" ).toBoolean if (useIncrementalCollect) { result.rdd.toLocalIterator } else { result.collect().iterator } } dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray setHasResultSet(true ) } catch { case e: Throwable => setState(OperationState .ERROR ) HiveThriftServer2 .listener.onStatementError( statementId, e.getMessage, e.getStackTraceString) logError("Error executing query:" , e) throw new HiveSQLException (e.toString) } setState(OperationState .FINISHED ) HiveThriftServer2 .listener.onStatementFinish(statementId) } }
那其实就很一目了然了:用户通过 JDBC execQuery 发送的请求最终被原封不动地转发到了 HiveContext.sql
上进行运算,结果保存在 SparkExecuteStatementOperation
中,同时保存一个 Iterator
,视客户端所需逐行逐行地以 ResultSet
的形式取出,并返回至客户端。
至此,SparkSQLCLIService 一侧的运作原理就基本探索完毕了。
总结 在深入了解过 SparkSQLCLIService
一侧的原理以后,之前那张图大概就会变成下面这个样子:
总体而言,Spark Hive ThriftServer 确实是基于 Apache Hive 的基础之上通过少量的修改、继承甚至是利用 Java 反射机制来 hack Hive 原本的类来将 Hive 本该转发至 Hadoop MapReduce 的操作转发到了 SparkSQL 的 HiveContext.sql
,因此在 JDBC 上调用 execQuery
和直接调用 HiveContext.sql
的效果是一致的。
除了 SparkSQLCLISerivce
,ThriftCLIService
侧的代码其实都是 Apache Hive 本身的代码,Spark 未对其进行任何改写。Spark Hive ThriftServer 项目本身的所有代码仅包括 SparkSQLCLIService
这一侧的代码和 Spark SQL Shell 的代码。因此总体而言,在阅读完本篇文章后,你应该已经完全了解 Spark Hive ThriftServer 的工作原理了。Hive ThriftCLIService
一侧的代码很有可能我不会再去看了,因为那一侧的代码的功能已经十分明确,但由于涉及到网络通信,毫无疑问那一侧的代码量将会是这一侧的好几倍。因此如果你只是想了解 SparkSQL Hive Server 的运作原理,你的目的已经达到了。恭喜你!