暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Spark2.x精通:Master端循环消息处理源码剖析(一)

大数据开发运维架构 2020-03-03
161

    上篇文章:Spark2.x精通:Standalone模式Master节点启动源码剖析,讲了Standalone模式下Master节点的启动过程中如何进行主函数调用和初始化的,但是Master启动后还需要与集群其他节点进行交互进行消息处理,比如:自身的主备切换、Woker状态检测、Worker注册等。这里我们就单独写一篇文章讲解Master的消息处理相关的源码。

    Master中的receive()函数主要用于接收消息,并进行对应的处理,他主要处理12种消息类型,这里会一一讲解:

1.当某个Master被选举为Leader后,就会调用LeaderLatchListener的isLeader()方法,这个方法内部开始进行逻辑上的数据恢复工作,具体细节是这样的,向Master发送ElectedLeader消息,进行数据的恢复,代码如下:

     //重新选择了新Leader,进行数据的恢复
    case ElectedLeader =>
         // 根据参数spark.deploy.recoveryMode配置的数据持久化类型,从持久化引擎中,读取数据,主要是获取      
         //主要是ApplicationInfo、DriverInfo、WorkerInfo三类数据,一般我们生产使用zookeeper做持久化引擎,
         //  默认znode节点为"/spark/master_status",由参数spark.deploy.zookeeper.dir控制
    val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
          //如果上面没有applition、driver、worker信息,状态更新为ALIVE,不然更新为RECOVERING
    state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
    RecoveryState.ALIVE
    } else {
    RecoveryState.RECOVERING
          }
    logInfo("I have been elected leader! New state: " + state)
          //下面开始正式恢复数据
    if (state == RecoveryState.RECOVERING) {
           // 数据恢复的主函数beginRecovery
    beginRecovery(storedApps, storedDrivers, storedWorkers)
            //数据恢复完成后,向主节点,其实也就是它自己,因为它自己当前就是主节点发送CompleteRecovery消息
    recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
    override def run(): Unit = Utils.tryLogNonFatalError {
    self.send(CompleteRecovery)
    }
    }, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
    }

        下面主要看看beginRecovery()函数,我们跟进去看一下,主要分了三大块,循环storedAppsstoredDriversstoredWorkers进行处理:

      private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
      storedWorkers: Seq[WorkerInfo]) {
            //遍历每一个存储的application,
      for (app <- storedApps) {
      logInfo("Trying to recover app: " + app.id)
            try {
              //重新进行应用程序的注册,这里注意一下状态置为UNKNOWN
      registerApplication(app)
              app.state = ApplicationState.UNKNOWN
              //这个app,并且发送MasterChanged请求,在StandaloneAppClient.scala中接收到MasterChanged消息进行处理
              app.driver.send(MasterChanged(self, masterWebUiUrl))
      } catch {
      case e: Exception => logInfo("App " + app.id + " had exception on reconnect")
      }
      }
          //进行driver的恢复,更新master所维护的driver集合
      for (driver <- storedDrivers) {
      // Here we just read in the list of drivers. Any drivers associated with now-lost workers
      // will be re-launched when we detect that the worker is missing.
      drivers += driver
      }
          // 进行Worker的恢复
      for (worker <- storedWorkers) {
      logInfo("Trying to recover worker: " + worker.id)
      try {
             // 重新进行Worker的注册,这里注意一下状态置为UNKNOWN
      registerWorker(worker)
      worker.state = WorkerState.UNKNOWN
              //这个worker向Worker发送MasterChanged请求,在Worker.scala文件中,
              // 接受到MasterChange消息,获取新的master的url和master,连接状态置为true,并进行其他处理
      worker.endpoint.send(MasterChanged(self, masterWebUiUrl))
      } catch {
      case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")
      }
      }
      }

         上面第11行代码,会发送MasterChaned消息,会在StandaloneAppClient.scala中进行处理:

              case MasterChanged(masterRef, masterWebUiUrl) =>
        logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
        master = Some(masterRef)
        alreadyDisconnected = false
        //最后会向Master发送MasterChangeAcknowledged消息,下面会讲解Master如何进行处理
        masterRef.send(MasterChangeAcknowledged(appId.get))

        注意:这里会向Master发送MasterChangeAcknowledged消息,这里我们先看下Master如何处理这个消息:

        2.Master接收StandaloneAppClient端发送的MasterChangeAcknowledged消息,Master端代码如下:

          case MasterChangeAcknowledged(appId) =>
          idToApp.get(appId) match {
          case Some(app) =>
          logInfo("Application has been re-registered: " + appId)
              //这里修改app状态诶WAITING,等待下一步处理
          app.state = ApplicationState.WAITING
          case None =>
          logWarning("Master change ack from unknown app: " + appId)
          }

              上面第31行代码,也会发送MasterChaned消息,会在Woker.scala中进行处理:

            case MasterChanged(masterRef, masterWebUiUrl) =>
            logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
                  //获取新的master的url和master,连接状态置为true,取消之前的尝试重新注册
                  changeMaster(masterRef, masterWebUiUrl, masterRef.address)
                  //创建当前节点executors的简单描述对象ExecutorDescription
            val execs = executors.values.
            map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
                    //最后会向Master发送WorkerSchedulerStateResponse消息,下面会讲解Master如何进行处理
            masterRef.send(WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq))

            注意:这里会向Master发送WorkerSchedulerStateResponse消息,这里我们先看下Master如何处理这个消息:

            3.Master接收StandaloneAppClient端的WorkerSchedulerStateResponse消息,Master端代码如下:

              case WorkerSchedulerStateResponse(workerId, executors, driverIds) =>
              idToWorker.get(workerId) match {
              case Some(worker) =>
              logInfo("Worker has been re-registered: " + workerId)
                        //修改Worker状态为ALIVE状态,因为已经向新的Master进行了重新注册
                        worker.state = WorkerState.ALIVE
                        //下面先处理Executor,再去处理driver
                        //过滤出那些有效的Executor,
              val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined)
              for (exec <- validExecutors) {
                         //获取Executor对应的App信息
              val app = idToApp.get(exec.appId).get
                          //从该Worker分配对应的cpu资源给这个Executor
              val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId))
                         //将这个Executor信息,添加的Worker中
              worker.addExecutor(execInfo)
              execInfo.copyState(exec)
              }
              //处理driver
              for (driverId <- driverIds) {
              drivers.find(_.id == driverId).foreach { driver =>
                            //重新在该Worker上进行部署,状态改为RUNNING
              driver.worker = Some(worker)
              driver.state = DriverState.RUNNING
              worker.addDriver(driver)
              }
              }
              case None =>
              logWarning("Scheduler state from unknown worker: " + workerId)
              }
                    //这里也会会向Master发送消息,调用completeRecovery
              if (canCompleteRecovery) { completeRecovery() }

                  

               总结:这里特别注意,重要,重要,重要,说三遍!!!!!!

                  这里需要总结下,上面的函数对app、driver、worker进行了恢复,这里一定要注意各个状态的变化,执行完成,这里app状态变成了WAITING,driver状态变成了RUNNING,worker状态变成了ALIVE

              4.在1中,beginRecovery()函数数据恢复完成后,会向Master发送CompleteRecovery消息,这里看下Master端对应的处理函数:

                 case CompleteRecovery => completeRecovery()

                这里直接看completeRecovery()函数:

                  private def completeRecovery() {
                  // Ensure "only-once" recovery semantics using a short synchronization period.
                      //更新下状态,正常情况下应该是RECOVERING,这里改成COMPLETING_RECOVERY
                  if (state != RecoveryState.RECOVERING) { return }
                  state = RecoveryState.COMPLETING_RECOVERY

                  // Kill off any workers and apps that didn't respond to us.
                      //杀掉未响应的worker和app,如果判断未响应呢,worker状态不是ALIVE,app状态不是WAITING,
                      //如果是UNKNOWN,说明上面没有处理,所以说没响应。
                  workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
                  apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)

                  // Update the state of recovered apps to RUNNING
                      //更新app状态为 RUNNING,这里我有点不明白为啥,不直接在上面直接更新成RUNNING,而是更新成了WAITING
                  apps.filter(_.state == ApplicationState.WAITING).foreach(_.state = ApplicationState.RUNNING)

                  // Reschedule drivers which were not claimed by any workers
                      //重新调度没有被任何worker运行的driver
                  drivers.filter(_.worker.isEmpty).foreach { d =>
                  logWarning(s"Driver ${d.id} was not found after master recovery")
                  if (d.desc.supervise) {
                  logWarning(s"Re-launching ${d.id}")
                          //这个调度driver的方法,我们后面会专门剖析
                  relaunchDriver(d)
                  } else {
                  removeDriver(d.id, DriverState.ERROR, None)
                  logWarning(s"Did not re-launch ${d.id} because it was not supervised")
                  }
                  }
                      //更新状态为ALIVE
                  state = RecoveryState.ALIVE
                      //这里是一个比较核心的调度函数,重新分配资源,调度driver和application,后面专门详细介绍
                  schedule()
                  logInfo("Recovery complete - resuming operations!")
                  }
                  文章转载自大数据开发运维架构,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                  评论