Nie&Gao
Anyway, life is elsewhere and we all know it.
##引言
接着上一篇来说[执行入口的分析][1],CliDriver
最终将用户指令command
提交给了Driver
的run
方法(针对常用查询语句而言),在这里用户的command
将会被编译,优化并生成MapReduce任务进行执行。所以Driver
也是Hive的核心,他扮演了一个将用户查询和MapReduce Task转换并执行的角色,下面我们就看看Hive是如何一步一步操作的。
##源码分析
在说run
方法之前,由于CliDriver
需要得到一个Driver
类的实例,所以首先看一下Driver
的构造方法。Driver
有三个构造函数,主要功能也就是设置类的实例变量HiveConf
。SessionState
前文已经有介绍,SessionState
返回了当前会话的一些信息,提取配置文件,初始化Driver
实例。
public Driver() {
if (SessionState.get() != null) {
conf = SessionState.get().getConf();
}
}
###run
下面就开始解析Driver
内部对用户命令command
的处理流程,首先是入口函数run
. run
函数通过调用runInternal
方法处理用户指令,在处理完成runInternal
之后,如果执行过程中出现出错,还附加了对错误码和错误信息的处理,此处省略。
```
public CommandProcessorResponse run(String command)
throws CommandNeedRetryException {
return run(command, false);
}
public CommandProcessorResponse run(String command, boolean alreadyCompiled)
throws CommandNeedRetryException {
CommandProcessorResponse cpr = runInternal(command, alreadyCompiled);
…
}
###runInternal
`runInternal`方法包含的主要操作有,处理`preRunHook`(具体功能可以顾名思义哦),`compile` , `execute`, 处理`postRunHook`以及构造`CommandProcessorResponse`并返回。下面依次从代码的角度分析这几步的具体操作:
###PreRunHook
处理`preRunHook`,首先根据配置文件和指令,构造用户Hook执行的上下文`hookContext`,然后读取用户`PreRunHook`配置指定的类(字符串), 此配置项对应于Hive配置文件当中的`“hive.exec.driver.run.hooks”`一项,利用反射机制`Class.forName`实例化`PreRunHook`类实例(`getHook`函数完成),依次执行各钩子的功能(`preDriverRun`函数完成)。
HiveDriverRunHookContext hookContext
= new HiveDriverRunHookContextImpl(conf, command);
// Get all the driver run hooks and pre-execute them.
List
###compile
编译,直接调用complieInternal
函数编译用户指令,将指令翻译成MapReduce
任务。这一个过程涉及的内容比较多,也很重要,后面将单独用一篇文章说明编译优化的过程。这里借用网上的一幅图,帮助对compile
的功能有个整体的理解,参考文献: Hive实现原理.pdf。
###execute
在运行之前还有获取锁的操作,由于新版本添加了ACID
事务的支持,还设置了事务管理器等,目前还没详细的弄懂这块的处理逻辑和功能,先放一下,主要看下execute
函数执行了什么操作,也就是如何根据编译结果执行任务的。
首先是从编译得到的查询计划QueryPlan
里获取基本的查询ID,查询字串等信息,并在回话状态中把当前查询字串和查询计划插入到历史记录中。
```
String queryId = plan.getQueryId();
String queryStr = plan.getQueryStr();
if (SessionState.get() != null) {
SessionState.get().getHiveHistory().startQuery(queryStr,
conf.getVar(HiveConf.ConfVars.HIVEQUERYID));
SessionState.get().getHiveHistory().logPlanProgress(plan);
}
```
与PreRunHook
类似,在执行任务之前,检查并执行用户设定的"hive.pre.exec.hooks"
,此处不再详述。完成这部操作之后,向控制台简单的打印一些信息之后,就开始正式执行任务了。
- DriverContext
创建执行上下文DriverContext,它记录的信息主要包括可执行的任务队列(Queue
public DriverContext(Context ctx) {
this.runnable = new ConcurrentLinkedQueue<Task<? extends Serializable»();
this.running = new LinkedBlockingQueue
public void prepare(QueryPlan plan) {
// extract stats keys from StatsTask
List<Task<?» rootTasks = plan.getRootTasks();
NodeUtils.iterateTask(rootTasks, StatsTask.class, new Function
- Manage and run all tasks
扯得有点远,继续看Driver
对查询任务的执行,在实例化DriverContext
对象之后,就将查询计划plan中的任务放入到DriverContext
的runnable
队列中。
for (Task<? extends Serializable> tsk : plan.getRootTasks()) {
assert tsk.getParentTasks() == null || tsk.getParentTasks().isEmpty();
driverCxt.addToRunnable(tsk);
}
下面就开始运行任务Task,整个任务的运行由一个循环控制,只要DriverContext
没有被关闭,并且runnable
和running
队列中还有任务就一直循环。为了方便描述,下文将一次对任务循环过程的每一步进行说明,这里只给出循环判断条件。
```
while (!destroyed && driverCxt.isRunning()) {}
public synchronized boolean isRunning() { return !shutdown && (!running.isEmpty() || !runnable.isEmpty()); } ``` 1. Put all the tasks into runnable queue
在循环内部,首先不停的从runnable
队列中抽取队首的任务,然后launch
该任务。
while (!destroyed && driverCxt.isRunning()) {
// Launch upto maxthreads tasks
Task<? extends Serializable> task;
while ((task = driverCxt.getRunnable(maxthreads)) != null) {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TASK + task.getName() + "." + task.getId());
TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt);
if (!runner.isRunning()) {
break;
}
}
2. Launch a task
在launch一个任务的过程中,根据任务类型(是不是MapReduceTask或者ConditialTask),做一些操作(don’t know what used for),将DriverContext
当前已启动任务数curJobNo
加1,然后根据配置文件conf,查询计划plan,执行上下文cxt(DriverContext
),初始化一个任务,接着创建任务结果TaskResult
对象和任务执行对象TaskRunner
,将TaskRunner
放入DriverContext
的running
队列中,表示该任务正在运行。最后,根据配置文件指定的任务运行模式,即是否支持并行运行,启动任务。
```
private TaskRunner launchTask(Task<? extends Serializable> tsk,
String queryId, boolean noName,
String jobname, int jobs, DriverContext cxt) throws HiveException {
if (SessionState.get() != null) {
SessionState.get().getHiveHistory().startTask(queryId, tsk,
tsk.getClass().getName());
}
if (tsk.isMapRedTask() && !(tsk instanceof ConditionalTask)) {
if (noName) {
conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, jobname + "(" +
tsk.getId() + ")");
}
conf.set("mapreduce.workflow.node.name", tsk.getId());
Utilities.setWorkflowAdjacencies(conf, plan);
cxt.incCurJobNo(1);
console.printInfo("Launching Job " + cxt.getCurJobNo() + " out of " + jobs);
}
tsk.initialize(conf, plan, cxt);
TaskResult tskRes = new TaskResult();
TaskRunner tskRun = new TaskRunner(tsk, tskRes);
cxt.launching(tskRun);
// Launch Task
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL)
&& (tsk.isMapRedTask() || (tsk instanceof MoveTask))) {
// Launch it in the parallel mode, as a separate thread only for MR tasks
//并发执行
if (LOG.isInfoEnabled()){
LOG.info("Starting task [" + tsk + "] in parallel");
}
tskRun.setOperationLog(OperationLog.getCurrentOperationLog());
tskRun.start();
} else {
if (LOG.isInfoEnabled()){
LOG.info("Starting task [" + tsk + "] in serial mode");
}
//顺序执行
tskRun.runSequential();
}
return tskRun; } ```
3. Poll a finished task
完成任务的启动之后,将调用DriverContext
的pollFinished
函数,查看任务是否执行完毕,如果有任务完成,则将该任务出队,并将已完成的任务添加到钩子上下文HookContext
中。
TaskRunner tskRun = driverCxt.pollFinished();
if (tskRun == null) {
continue;
}
hookContext.addCompleteTask(tskRun);
public synchronized TaskRunner pollFinished() throws InterruptedException {
while (!shutdown) {
Iterator<TaskRunner> it = running.iterator();
while (it.hasNext()) {
TaskRunner runner = it.next();
if (runner != null && !runner.isRunning()) {
it.remove();
return runner;
}
}
wait(SLEEP_TIME);
}
return null;
}
4. Handle the finished task
针对一个已完成的任务,首先获取任务的结果对象TaskResult
和退出状态, 如果任务非正常退出,则第一步先判断任务是否支持Retry
,如果支持,关闭当前DriverContext
,设置jobTracker
为初始状态,抛出CommandNeedRetry
异常,这个异常会在CliDriver
的processLocalCmd
中捕获,然后尝试重新处理该命令,参见上一篇文章的说明。如果任务不支持Retry
,则启动备份任务backupTask
(类似于回滚?),并添加到runnable
队列,在下次循环过程中执行。如果没有backupTask
,则查找用户配置“hive.exec.failure.hooks”
,根据用户配置相应出错处理,并关闭DriverContext
, 返回退出码。
```
Task<? extends Serializable> tsk = tskRun.getTask();
TaskResult result = tskRun.getTaskResult();
int exitVal = result.getExitVal(); if (exitVal != 0) { if (tsk.ifRetryCmdWhenFail()) { driverCxt.shutdown(); // in case we decided to run everything in local mode, restore the // the jobtracker setting to its initial value ctx.restoreOriginalTracker(); throw new CommandNeedRetryException(); } Task<? extends Serializable> backupTask = tsk.getAndInitBackupTask(); if (backupTask != null) { setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk); console.printError(errorMessage); errorMessage = “ATTEMPT: Execute BackupTask: “ + backupTask.getClass().getName(); console.printError(errorMessage);
// add backup task to runnable
if (DriverContext.isLaunchable(backupTask)) {
driverCxt.addToRunnable(backupTask);
}
continue;
} else {
hookContext.setHookType(HookContext.HookType.ON_FAILURE_HOOK);
// Get all the failure execution hooks and execute them.
for (Hook ofh : getHooks(HiveConf.ConfVars.ONFAILUREHOOKS)) {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.FAILURE_HOOK + ofh.getClass().getName());
((ExecuteWithHookContext) ofh).run(hookContext);
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.FAILURE_HOOK + ofh.getClass().getName());
}
setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk);
SQLState = "08S01";
console.printError(errorMessage);
driverCxt.shutdown();
// in case we decided to run everything in local mode, restore the
// the jobtracker setting to its initial value
ctx.restoreOriginalTracker();
return exitVal;
} } ``` **5. Find children tasks**
最后调用DriverContext
的finished
函数,对完成的任务进行处理(处理逻辑没看懂), 然后判断当前任务是否包含子任务,如果包含则依次将子任务添加到runnable
队列,下次循环中被启动执行。
```
driverCxt.finished(tskRun);
if (tsk.getChildTasks() != for (Task<? extends Serializable> child : tsk.getChildTasks()) { if (DriverContext.isLaunchable(child)) { driverCxt.addToRunnable(child); } } } ```
6. Do something before return
当所有的任务都完成之后,如果发现DriverContext
已经被关闭,表明任务取消,打印信息并返回对应的状态码。最后清楚任务执行中不完整的输出,并加载执行用户指定的"hive.exec.post.hooks"
,完成对应的钩子功能。对于执行过程中出现的异常,CommandNeedRetryException
将会直接向上抛出,其他Exception
,直接打印出错信息。无论是否发生异常,只要能够获取到任务执行过程中的MapReduce状态信息,都将在finally语句块中打印。(限于篇幅,此处只给出部分代码,钩子的处理方式前文已经给出不再详述,异常处理的部分,有兴趣的执行查看)
```
//判断DriverContext是否被关闭
if (driverCxt.isShutdown()) {
SQLState = “HY008”;
errorMessage = “FAILED: Operation cancelled”;
console.printError(errorMessage);
return 1000;
}
//删除不完整的输出
HashSet
for (WriteEntity output : remOutputs) { plan.getOutputs().remove(output); }
最后的最后,如果所有的任务都正常执行完毕,此次查询完成,plan.setDone(),打印OK~
###PostRunHook and return
还没完~当`execute`函数执行完成后,返回到`runInternal`函数中,接着释放锁,与之前的`PreRunHook`相对应,还需要加载相应用户自定义的`PostRunHook`(代码不再重复),最后才调用`creatProcessorResponse`,创建响应对象`CommandProcessorResponse`并返回。
private CommandProcessorResponse createProcessorResponse(int ret) { return new CommandProcessorResponse(ret, errorMessage, SQLState, downstreamError); }
```
想更一进步的支持我,请扫描下方的二维码,你懂的~