上一篇文章Spark2.x精通:Master端循环消息处理源码剖析(一)主要讲解了:
1主节点切换后,数据恢复 app、driver、executor恢复流程;
2.完成数据恢复后对应App、Worker返回消息的处理。
这里继续讲Master.scala中的receive()函数剩下几种消息类型:
1.触发Master Leadership的选举,结束当前Master节点进程。
case RevokedLeadership =>logError("Leadership has been revoked -- master shutting down.")System.exit(0)
2.向Master注册Worker节点,这里处理时,会向Worker发送回执类的消息,我们这里不分析Worker里的消息,处理后面会单独写Worker端消息处理函数:
case RegisterWorker(id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) =>logInfo("Registering worker %s:%d with %d cores, %s RAM".format(workerHost, workerPort, cores, Utils.megabytesToString(memory)))//如果当前Master节点状态为STANDBY,直接向Worker发送MasterInStandby消息,// 这个会在Woroker.scala中处理if (state == RecoveryState.STANDBY) {workerRef.send(MasterInStandby)}// else如果发现当前Worker节点已注册,也会向Worker发送RegisterWorkerFailed,else if (idToWorker.contains(id)) {workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))} else {//根据传过来的主机、端口、cpu、内存创建一个WorkerInfo实例val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,workerRef, workerWebUiUrl)//这里才是核心处理函数,进行Worker的处理if (registerWorker(worker)) {//将注册的Woker添加到已实例化的持久化引擎中,一般我们持久化引擎是zookeeperpersistenceEngine.addWorker(worker)//注册成功后,向Worker发送RegisteredWorker消息,通知Worker注册完成,Worker.scala进行处理workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))//有新的Worker加入,进行重新调度schedule()} else {//如果注册失败,打印日志,向Worker发送RegisterWorkerFailed消息,通知Worker注册失败,Worker.scala进行处理val workerAddress = worker.endpoint.addresslogWarning("Worker registration failed. Attempted to re-register worker at same " +"address: " + workerAddress)workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: "+ workerAddress))}}
这里专门分析下第18行的核心的注册函数registerWorker(worker):
private def registerWorker(worker: WorkerInfo): Boolean = {// There may be one or more refs to dead workers on this same node (w/ different ID's),// remove them.//同一节点上可能有一个或多个引用指向该已死的Worker,这里直接移除他们workers.filter { w =>(w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)}.foreach { w =>workers -= w}//这下面几行大体意思是:如果Worker是从UNKNOWN状态来进行注册的,说明这个Worker在数据恢复期间发生了重启,// 我们认为他已经死了 直接先移除原来那个Worker(ip相同)// 这里提醒一下:主节点选举完成后,进行数据恢复的过程中就是把Worker状态先置为UNKNOWN再置为ALIVEval workerAddress = worker.endpoint.addressif (addressToWorker.contains(workerAddress)) {val oldWorker = addressToWorker(workerAddress)if (oldWorker.state == WorkerState.UNKNOWN) {// A worker registering from UNKNOWN implies that the worker was restarted during recovery.// The old worker must thus be dead, so we will remove it and accept the new worker.removeWorker(oldWorker)} else {logInfo("Attempted to re-register worker at same address: " + workerAddress)return false}}//将Worker相关信息加入Master的相关变量中workers += workeridToWorker(worker.id) = workeraddressToWorker(workerAddress) = workerif (reverseProxy) {webUi.addProxyTargets(worker.id, worker.webUiAddress)}//注册完毕,返回turetrue
3.注册Application信息,先看代码:
case RegisterApplication(description, driver) =>//如果当前Master节点状态为STANDBY,直接忽略不处理if (state == RecoveryState.STANDBY) {// ignore, don't send response} else {logInfo("Registering app " + description.name)//接收ApplicationDescription类的对象,这个类里包含了启动这个App的详细参数信息// (name、maxCores、memoryPerExecutorMB...),这个对象作为ApplicationInfo类的属性当做引用使用val app = createApplication(description, driver)registerApplication(app)logInfo("Registered app " + description.name + " with ID " + app.id)//根据配置的持久化引擎,持久化app实例persistenceEngine.addApplication(app)//注册完成,返回注册结果,包含了appId,和Master的通信对象RpcEndpointRef//这个消息会在StandaloneAppClient.scala的receive()函数进行处理driver.send(RegisteredApplication(app.id, self))//有新的Application加入,进行重新调度schedule()}
这里专门分析下第9行的核心的注册函数registerApplication(app):
private def registerApplication(app: ApplicationInfo): Unit = {//获取driver地址val appAddress = app.driver.address//如果app注册,进行重新注册时,不处理,直接返回if (addressToApp.contains(appAddress)) {logInfo("Attempted to re-register application at same address: " + appAddress)return}//这是监控系统相关的东西,也是一个注册函数,这里不做讨论applicationMetricsSystem.registerSource(app.appSource)//这里将app相关信息加入到Master的相关变量中apps += appidToApp(app.id)//endpointToApp存的是HashMap<RpcEndpointRef, ApplicationInfo>,// 保存了每个消息发送方所对应的appendpointToApp(app.driver)addressToApp(appAddress)//将注册的app加入到等待队列,等待后续的处理waitingApps += appif (reverseProxy) {webUi.addProxyTargets(app.id, app.desc.appUiUrl)}}
4.Executor状态变更消息处理
case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>//找到这个executor对应的app,然后从app中获取executor描述句柄ExecutorDescription// ExecutorDescription包含以下信息:// val appId: String,// val execId: Int,// val cores: Int,// val state: ExecutorState.Value)val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))execOption match {//exec不为空值case Some(exec) =>val appInfo = idToApp(appId)val oldState = exec.stateexec.state = state//上面三行获取app信息,备份原有状态到oldState//如果新状态为RUNNING,打印日志:状态从LAUNCHING变为RUNNING。重置app最大重试次数if (state == ExecutorState.RUNNING) {assert(oldState == ExecutorState.LAUNCHING,s"executor $execId state transfer from $oldState to RUNNING is illegal")appInfo.resetRetryCount()}//向driver端发送ExecutorUpdated消息,消息将在StandaloneAppClient.scala中进行处理exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false))//如果Executor已结束,状态可能是KILLED, FAILED, LOST, EXITEDif (ExecutorState.isFinished(state)) {// Remove this executor from the worker and app 、logInfo(s"Removing executor ${exec.fullId} because it is $state")// If an application has already finished, preserve its// state to display its information properly on the UI//如果app未结束,则移除executorif (!appInfo.isFinished) {appInfo.removeExecutor(exec)}exec.worker.removeExecutor(exec)val normalExit = exitStatus == Some(0)// Only retry certain number of times so we don't go into an infinite loop.// Important note: this code path is not exercised by tests, so be very careful when// changing this `if` condition.//下面几行大体意思是:如果异常退出,且app重试次数大于MAX_EXECUTOR_RETRIES//删除app,这里会调用 removeApplication(appInfo, ApplicationState.FAILED)函数//这个函数合理不分析了,主要就是从master删除原先保存的app信息,并添加到已完成的app中,并且会更新webUI展示信息if (!normalExit&& appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES&& MAX_EXECUTOR_RETRIES >= 0) { // < 0 disables this application-killing pathval execs = appInfo.executors.valuesif (!execs.exists(_.state == ExecutorState.RUNNING)) {logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +s"${appInfo.retryCount} times; removing it")removeApplication(appInfo, ApplicationState.FAILED)}}}//executor状态变化,需要进行重新调度schedule()case None =>logWarning(s"Got status update for unknown executor $appId/$execId")}
5.Driver状态变更消息处理,代码比较少,这里简单介绍一下:
case DriverStateChanged(driverId, state, exception) =>state match {// 如果状态是ERROR、FINISHED KILLED FAILED ,直接删除case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>removeDriver(driverId, state, exception)case _ =>throw new Exception(s"Received unexpected state update for driver $driverId: $state")}
这里看下上面代码第5行的removeDriver()函数:
private def removeDriver(driverId: String,finalState: DriverState,exception: Option[Exception]) {drivers.find(d => d.id == driverId) match {case Some(driver) =>logInfo(s"Removing driver: $driverId")//将driver从drivers[HashSet]内存中移除drivers -= driver//监控相关更新,不解释if (completedDrivers.size >= RETAINED_DRIVERS) {val toRemove = math.max(RETAINED_DRIVERS 10, 1)completedDrivers.trimStart(toRemove)}completedDrivers += driver//从已实例化的持久化引擎中移除persistenceEngine.removeDriver(driver)driver.state = finalStatedriver.exception = exception//从driver所在的Worker中移除,这可以去看Workerinfo.scala中的removeDriver()函数driver.worker.foreach(w => w.removeDriver(driver))//移除之后,重新进行调度schedule()case None =>logWarning(s"Asked to remove unknown driver: $driverId")}}
后面还有几个消息处理函数,由于代码比较简单我这里不再一一细说,简单列一下:
1).Heartbeat
如果发送过来的worker已经保存在master的内存中,那么就更新该worker的最后心跳时间;如果当前master内存中还没有worker的信息,那么向worker发送ReconnectWorker消息
2).WorkerLatestState
worker发送过来的executor信息,如果在master内存中找不到,给worker发送KillExecutor消息;worker发送过来的driver信息,如果在master内存中找不到,给worker发送KillDriver消息。
3).UnregisterApplication
从Master节点移除app、driver信息;移除executor,并向Worker发送KillExecutor消息;从持久化实例中删除app信息,最后还是调用schedule()函数,重新进行资源调度。
至此,Master相关的消息处理函数,基本介绍完了,如果觉得有帮助,请帮我转发朋友圈,谢谢关注!!!




