暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Spark2.x精通:Master端循环消息处理源码剖析(二)

大数据开发运维架构 2020-03-04
298

    上一篇文章Spark2.x精通:Master端循环消息处理源码剖析(一)主要讲解了:

        1主节点切换后,数据恢复 app、driver、executor恢复流程;

2.完成数据恢复后对应App、Worker返回消息的处理。

    

这里继续讲Master.scala中的receive()函数剩下几种消息类型:

1.触发Master Leadership的选举,结束当前Master节点进程。

    case RevokedLeadership =>
    logError("Leadership has been revoked -- master shutting down.")
    System.exit(0)

    2.向Master注册Worker节点,这里处理时,会向Worker发送回执类的消息,我们这里不分析Worker里的消息,处理后面会单独写Worker端消息处理函数:

      case RegisterWorker(
      id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) =>
      logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
      workerHost, workerPort, cores, Utils.megabytesToString(memory)))
        //如果当前Master节点状态为STANDBY,直接向Worker发送MasterInStandby消息,
        // 这个会在Woroker.scala中处理
        if (state == RecoveryState.STANDBY) {
      workerRef.send(MasterInStandby)
      }
        // else如果发现当前Worker节点已注册,也会向Worker发送RegisterWorkerFailed, 
      else if (idToWorker.contains(id)) {
      workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
      } else {
        //根据传过来的主机、端口、cpu、内存创建一个WorkerInfo实例
        val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
      workerRef, workerWebUiUrl)
          //这里才是核心处理函数,进行Worker的处理
      if (registerWorker(worker)) {
        //将注册的Woker添加到已实例化的持久化引擎中,一般我们持久化引擎是zookeeper
      persistenceEngine.addWorker(worker)
          //注册成功后,向Worker发送RegisteredWorker消息,通知Worker注册完成,Worker.scala进行处理
      workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))
          //有新的Worker加入,进行重新调度
      schedule()
      } else {
          //如果注册失败,打印日志,向Worker发送RegisterWorkerFailed消息,通知Worker注册失败,Worker.scala进行处理
      val workerAddress = worker.endpoint.address
      logWarning("Worker registration failed. Attempted to re-register worker at same " +
      "address: " + workerAddress)
      workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: "
      + workerAddress))
      }
      }

          这里专门分析下第18行的核心的注册函数registerWorker(worker):

         private def registerWorker(worker: WorkerInfo): Boolean = {
        // There may be one or more refs to dead workers on this same node (w/ different ID's),
            // remove them.
            //同一节点上可能有一个或多个引用指向该已死的Worker,这里直接移除他们
        workers.filter { w =>
        (w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
        }.foreach { w =>
        workers -= w
        }

            //这下面几行大体意思是:如果Worker是从UNKNOWN状态来进行注册的,说明这个Worker在数据恢复期间发生了重启,
            // 我们认为他已经死了 直接先移除原来那个Worker(ip相同)
            // 这里提醒一下:主节点选举完成后,进行数据恢复的过程中就是把Worker状态先置为UNKNOWN再置为ALIVE
        val workerAddress = worker.endpoint.address
        if (addressToWorker.contains(workerAddress)) {
        val oldWorker = addressToWorker(workerAddress)
        if (oldWorker.state == WorkerState.UNKNOWN) {
        // A worker registering from UNKNOWN implies that the worker was restarted during recovery.
        // The old worker must thus be dead, so we will remove it and accept the new worker.
        removeWorker(oldWorker)
        } else {
        logInfo("Attempted to re-register worker at same address: " + workerAddress)
        return false
        }
        }
            //将Worker相关信息加入Master的相关变量中
        workers += worker
        idToWorker(worker.id) = worker
        addressToWorker(workerAddress) = worker
        if (reverseProxy) {
        webUi.addProxyTargets(worker.id, worker.webUiAddress)
        }
            //注册完毕,返回ture
            true

           

        3.注册Application信息,先看代码:

           case RegisterApplication(description, driver) =>
               //如果当前Master节点状态为STANDBY,直接忽略不处理
          if (state == RecoveryState.STANDBY) {
          // ignore, don't send response
          } else {
                  logInfo("Registering app " + description.name)
                  //接收ApplicationDescription类的对象,这个类里包含了启动这个App的详细参数信息
                  // (name、maxCores、memoryPerExecutorMB...),这个对象作为ApplicationInfo类的属性当做引用使用
          val app = createApplication(description, driver)
          registerApplication(app)
          logInfo("Registered app " + description.name + " with ID " + app.id)
                  //根据配置的持久化引擎,持久化app实例
                  persistenceEngine.addApplication(app)
                  //注册完成,返回注册结果,包含了appId,和Master的通信对象RpcEndpointRef
                  //这个消息会在StandaloneAppClient.scala的receive()函数进行处理
                  driver.send(RegisteredApplication(app.id, self))
          //有新的Application加入,进行重新调度
          schedule()
          }

              这里专门分析下第9行的核心的注册函数registerApplication(app):

            private def registerApplication(app: ApplicationInfo): Unit = {
                //获取driver地址
            val appAddress = app.driver.address
                //如果app注册,进行重新注册时,不处理,直接返回
            if (addressToApp.contains(appAddress)) {
            logInfo("Attempted to re-register application at same address: " + appAddress)
            return
            }
               //这是监控系统相关的东西,也是一个注册函数,这里不做讨论
            applicationMetricsSystem.registerSource(app.appSource)
                //这里将app相关信息加入到Master的相关变量中
            apps += app
            idToApp(app.id)
            //endpointToApp存的是HashMap<RpcEndpointRef, ApplicationInfo>,
            // 保存了每个消息发送方所对应的app
                endpointToApp(app.driver)
            addressToApp(appAddress)
                //将注册的app加入到等待队列,等待后续的处理
            waitingApps += app
            if (reverseProxy) {
            webUi.addProxyTargets(app.id, app.desc.appUiUrl)
            }
            }

            4.Executor状态变更消息处理

              case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
                //找到这个executor对应的app,然后从app中获取executor描述句柄ExecutorDescription
                // ExecutorDescription包含以下信息:
                // val appId: String,
                // val execId: Int,
                // val cores: Int,
                // val state: ExecutorState.Value)
                val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))

              execOption match {
                //exec不为空值
              case Some(exec) =>
              val appInfo = idToApp(appId)
              val oldState = exec.state
              exec.state = state
                  //上面三行获取app信息,备份原有状态到oldState
                  //如果新状态为RUNNING,打印日志:状态从LAUNCHING变为RUNNING。重置app最大重试次数
              if (state == ExecutorState.RUNNING) {
              assert(oldState == ExecutorState.LAUNCHING,
              s"executor $execId state transfer from $oldState to RUNNING is illegal")
              appInfo.resetRetryCount()
              }
                  //向driver端发送ExecutorUpdated消息,消息将在StandaloneAppClient.scala中进行处理
              exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false))

                  //如果Executor已结束,状态可能是KILLED, FAILED, LOST, EXITED
              if (ExecutorState.isFinished(state)) {
                  // Remove this executor from the worker and app    、
              logInfo(s"Removing executor ${exec.fullId} because it is $state")
              // If an application has already finished, preserve its
              // state to display its information properly on the UI
                 //如果app未结束,则移除executor
              if (!appInfo.isFinished) {
              appInfo.removeExecutor(exec)
              }
              exec.worker.removeExecutor(exec)

              val normalExit = exitStatus == Some(0)
              // Only retry certain number of times so we don't go into an infinite loop.
              // Important note: this code path is not exercised by tests, so be very careful when
              // changing this `if` condition.
                  //下面几行大体意思是:如果异常退出,且app重试次数大于MAX_EXECUTOR_RETRIES
                  //删除app,这里会调用 removeApplication(appInfo, ApplicationState.FAILED)函数
                 //这个函数合理不分析了,主要就是从master删除原先保存的app信息,并添加到已完成的app中,并且会更新webUI展示信息
              if (!normalExit
              && appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES
              && MAX_EXECUTOR_RETRIES >= 0) { // < 0 disables this application-killing path
              val execs = appInfo.executors.values
              if (!execs.exists(_.state == ExecutorState.RUNNING)) {
              logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
              s"${appInfo.retryCount} times; removing it")
              removeApplication(appInfo, ApplicationState.FAILED)
              }
              }
              }
                  //executor状态变化,需要进行重新调度
              schedule()
              case None =>
              logWarning(s"Got status update for unknown executor $appId/$execId")
              }

              5.Driver状态变更消息处理,代码比较少,这里简单介绍一下:

                case DriverStateChanged(driverId, state, exception) =>
                state match {
                        // 如果状态是ERROR、FINISHED KILLED FAILED ,直接删除
                case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
                removeDriver(driverId, state, exception)
                case _ =>
                throw new Exception(s"Received unexpected state update for driver $driverId: $state")
                }

                    这里看下上面代码第5行的removeDriver()函数:

                   private def removeDriver(
                  driverId: String,
                  finalState: DriverState,
                  exception: Option[Exception]) {
                  drivers.find(d => d.id == driverId) match {
                  case Some(driver) =>
                  logInfo(s"Removing driver: $driverId")
                          //将driver从drivers[HashSet]内存中移除
                  drivers -= driver
                          //监控相关更新,不解释
                  if (completedDrivers.size >= RETAINED_DRIVERS) {
                  val toRemove = math.max(RETAINED_DRIVERS 10, 1)
                  completedDrivers.trimStart(toRemove)
                  }
                          
                          completedDrivers += driver
                          //从已实例化的持久化引擎中移除
                  persistenceEngine.removeDriver(driver)
                  driver.state = finalState
                  driver.exception = exception
                          //从driver所在的Worker中移除,这可以去看Workerinfo.scala中的removeDriver()函数
                  driver.worker.foreach(w => w.removeDriver(driver))
                          //移除之后,重新进行调度
                          schedule()
                  case None =>
                  logWarning(s"Asked to remove unknown driver: $driverId")
                  }
                  }

                  后面还有几个消息处理函数,由于代码比较简单我这里不再一一细说,简单列一下:

                  1).Heartbeat

                      如果发送过来的worker已经保存在master的内存中,那么就更新该worker的最后心跳时间;如果当前master内存中还没有worker的信息,那么向worker发送ReconnectWorker消息

                  2).WorkerLatestState

                      worker发送过来的executor信息,如果在master内存中找不到,给worker发送KillExecutor消息;worker发送过来的driver信息,如果在master内存中找不到,给worker发送KillDriver消息。

                  3).UnregisterApplication

                      从Master节点移除app、driver信息;移除executor,并向Worker发送KillExecutor消息;从持久化实例中删除app信息,最后还是调用schedule()函数,重新进行资源调度。


                      至此,Master相关的消息处理函数,基本介绍完了,如果觉得有帮助,请帮我转发朋友圈,谢谢关注!

                  文章转载自大数据开发运维架构,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                  评论