2021SC@SDUSC
Hadoop yarn源码分析(八) AsyncDispatcher事件异步调度器 2021SC@SDUSC
- 一、AsyncDispatcher 概述
- 二、AsyncDispatcher 属性
- 2.1 AsyncDispatcher成员变量
- 2.2 AsyncDispatcher构造器
- 三、AsyncDispatcher 源码分析
- 3.1 serviceInit()方法
- 3.2 serviceStart()方法
- 3.3 run()方法
- 3.3 dispatch()方法
- 3.5 serviceStop()方法
一、AsyncDispatcher 概述
作为yarn中的事件异步调度器,AsyncDispatcher是RM中基于阻塞队列的调度事件的组件,它在一个特定的单线程中分派事件,并将分派的事件交给AsyncDispatcher中已经注册的对应的EventHandler事件处理器来处理。可以用一个线程池来调度时间,从而完成对多个事件的处理。
处理过程如下:处理请求进入系统后,由AsyncDispatcher(异步调度器)传递给相应的EventHandler(事件处理器)。之后,该事件处理器可能将事件转发给另外一个事件处理器,也有可能交给一个含有限状态机的事件处理器,最终的处理结果以事件的形式传送给AsyncDispatcher。新的事件会被AsyncDispatcher处理后再次转发,直到处理完成。
在基于事件驱动的编程模型中,所有对象被抽象成了事件处理器,而事件处理器之间又通过事件相互关联。每种事件处理器处理一种类型的事件, 同时根据需要触发另外一种事件。
二、AsyncDispatcher 属性
2.1 AsyncDispatcher成员变量
AsyncDispatcher中还有一些标志位,如下:
1.stopped:是否停止的标志位
2.drainEventsOnStop:在stop功能中开启/禁用流尽分发器事件的配置标志位。如果启动成功,则AsyncDispatcher停止前需要先处理完eventQueue中的事件,否则直接停止。
3.drained:stop功能中所有剩余分发器事件已经被处理或流尽的标志位
4.waitForDrained:drain标志位上的等待锁。
5.blockNewEvents:在AsyncDispatcher停止过程中阻塞新近到来的事件进入队列的标志位,仅当drainEventsOnStop启用(即为true)时有效。
6.exitOnDispatchException:确保调度程序崩溃,但不做系统退出system-exit的标志位。
//package org.apache.hadoop.yarn.event.AsyncDispatcher.java
//待处理事务阻塞队列
private final BlockingQueue<Event> eventQueue;
//AsyncDispatcher标志位,是否停止
private volatile boolean stopped = false;
//控制详细信息队列事件打印的配置
private int detailsInterval;
private boolean printTrigger = false;
//在stop功能中开启/禁用流尽分发器事件的配置标志位
private volatile boolean drainEventsOnStop = false;
//stop功能中所有剩余分发器事件已经被处理或流尽的标志位
private volatile boolean drained = true;
//drained的等待锁
private final Object waitForDrained = new Object();
// 在AsyncDispatcher停止过程中阻塞新近到来的事件进入队列的标志位,仅当drainEventsOnStop启用(即为true)时有效
private volatile boolean blockNewEvents = false;
//实例
private final EventHandler<Event> handlerInstance = new GenericEventHandler();
//事件调度线程
private Thread eventHandlingThread;
//事件类型枚举类Enum到事件处理器EventHandler实例的映射集合
protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
//确保调度程序崩溃,但不做系统退出system-exit
private boolean exitOnDispatchException = true;
2.2 AsyncDispatcher构造器
无参构造: AsyncDispatcher中的eventQueue,是一个阻塞队列,默认的实现为线程安全的链式阻塞队列LinkedBlockingQueue,这在其无参构造方法中有体现。
有参构造:初始化eventDispatchers集合为HashMap,专门存储枚举Enum类型事件至事件处理器EventHandler实例的映射关系。所有被调度器分发的事件,都必须按照eventDispatchers来注册一个事件处理器EventHandler,若未注册,则调度器不会分发事件。
//无参构造
public AsyncDispatcher() {
//调用有参构造,传入LinkedBlockingQueue实例
this(new LinkedBlockingQueue<Event>());
}
//有参构造
public AsyncDispatcher(BlockingQueue<Event> eventQueue) {
super("Dispatcher");
this.eventQueue = eventQueue;
this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
this.eventTypeMetricsMap = new HashMap<Class<? extends Enum>,
EventTypeMetrics>();
}
三、AsyncDispatcher 源码分析
线程启动
3.1 serviceInit()方法
取参数yarn.dispatcher.exit-on-error, 参数未配置默认为false
@Override
protected void serviceInit(Configuration conf) throws Exception{
super.serviceInit(conf);
this.detailsInterval = getConfig().getInt(YarnConfiguration.
YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD,
YarnConfiguration.
DEFAULT_YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD);
}
3.2 serviceStart()方法
创建一个事件调度线程eventHandlingThread
,并启动线程。
@Override
protected void serviceStart() throws Exception {
//启动
super.serviceStart();
eventHandlingThread = new Thread(createThread());
eventHandlingThread.setName(dispatcherThreadName);
//启动事件调度线程eventHandlingThread
eventHandlingThread.start();
}
3.3 run()方法
eventHandlingThread由createThread()方法来定义的,并在其中有一个run()方法。
run方法中的while循环,判断标志位stopped是否为false,即当AsyncDispatcher未停止且当前线程并未中断的时候,一直运行。
先将标志位drained赋值为eventQueue是否为空,若停止过程中阻止新的事件加入队列,将标志位blockNewsEvents设为true,若待处理的事件都已调度完,则将drained赋值为true,调用waitForDrained的notify()方法通知等待者。
当遇到正常时事件event时,用take()方法取走blockingQueue中首位对象,若blockingQueue为空,则阻塞,并等待至有新的数据加入,并调用dispatch()方法分发事件。
Runnable createThread() {
return new Runnable() {
@Override
public void run() {
//标志位stopped为false,且当前线程未中断的话,一直运行
while (!stopped && !Thread.currentThread().isInterrupted()) {
//判断事件调度队列是否为空,并将值赋给标志位drained
drained = eventQueue.isEmpty();
//如果停止过程中阻止新的事件加入待处理队列,即标志位blockNewEvents为true
if (blockNewEvents) {
synchronized (waitForDrained) {
if (drained) {
waitForDrained.notify();
}
}
}
Event event;
try {
//从eventQueue中取出一个事件
event = eventQueue.take();
} catch(InterruptedException ie) {
if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", ie);
}
return;
}
if (event != null) {
if (eventTypeMetricsMap.
get(event.getType().getDeclaringClass()) != null) {
long startTime = clock.getTime();
dispatch(event);
eventTypeMetricsMap.get(event.getType().getDeclaringClass())
.increment(event.getType(),
clock.getTime() - startTime);
} else {
//调用dispatch()方法进行分发调度
dispatch(event);
}
if (printTrigger) {
//记录可能会导致队列中的事件过多的最新的调度事件类型
LOG.info("Latest dispatch event type: " + event.getType());
printTrigger = false;
}
}
}
}
};
}
3.3 dispatch()方法
@SuppressWarnings("unchecked")
protected void dispatch(Event event) {
//所有事件都通过此循环
LOG.debug("Dispatching the event {}.{}", event.getClass().getName(),
event);
//根据event确定事件类型枚举类
Class<? extends Enum> type = event.getType().getDeclaringClass();
try{
//根据事件类型枚举类type,从eventDispatchers中获取事件处理器EventHandler实例handler
EventHandler handler = eventDispatchers.get(type);
if(handler != null) {
handler.handle(event);
} else {
throw new Exception("No handler for registered for " + type);
}
} catch (Throwable t) {
//将队列的状态记入日志中
LOG.error(FATAL, "Error in dispatcher thread", t);
// 如果调用serviceStop,退出该线程.
if (exitOnDispatchException
&& (ShutdownHookManager.get().isShutdownInProgress()) == false
&& stopped == false) {
stopped = true;
Thread shutDownThread = new Thread(createShutDownThread());
shutDownThread.setName("AsyncDispatcher ShutDown handler");
shutDownThread.start();
}
}
}
3.5 serviceStop()方法
先对标志位drainEventsOnStop进行判断,若为true,则AsyncDispatcher需要在挺值钱处理完待调度处理队列eventQueue中事件,需要先将标志位blockNewEvents设置为true,阻止新的事件加入,记录info的日志信息。waitForDrained上通过synchronized进行同步:若队列中的事件还没处理完,同时eventHandlingThread线程仍然存活,waitForDrained释放锁,调用wait方法,等待1s,并记录info的日志信息。将标志位stopped设为true,标志着AsyncDispatcher服务停止。
若eventHandlingThread线程不为null,则中断eventHandlingThread线程,等待eventHandlingThread的结束,调用父类中的serviceStop()方法。
@Override
protected void serviceStop() throws Exception {
if (drainEventsOnStop) {
blockNewEvents = true;
LOG.info("AsyncDispatcher is draining to stop, ignoring any new events.");
long endTime = System.currentTimeMillis() + getConfig()
.getLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT,
YarnConfiguration.DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT);
synchronized (waitForDrained) {
while (!isDrained() && eventHandlingThread != null
&& eventHandlingThread.isAlive()
&& System.currentTimeMillis() < endTime) {
waitForDrained.wait(100);
LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" +
eventHandlingThread.getState());
}
}
}
stopped = true;
if (eventHandlingThread != null) {
eventHandlingThread.interrupt();
try {
eventHandlingThread.join();
} catch (InterruptedException ie) {
LOG.warn("Interrupted Exception while stopping", ie);
}
}
//停止
super.serviceStop();
}