暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Spark 启动 | Worker 启动流程详解

大数据记事本 2021-01-13
1389

一、往期回顾

    从上一篇对 Spark 启动脚本的分析中可以知道,Worker 类的全限定名称如下:
    org.apache.spark.deploy.worker.Worker

        启动 Worker 实际执行的就是伴生对象的 main 方法。

        Worker 和 Master 都是 Endpoint 的子类,所以启动流程很相似,即创建 RpcEnv 对象,然后创建 Worker 对象并执行 onStart 方法开启其生命周期。

        由于 Master 和 Worker 采用的是主从架构,所以 Worker 启动后最重要的事就是向 Master 注册,注册成功后定期发送心跳消息告诉 Master 自己活着。

    二、Worker 启动流程图解

    三、Worker 启动流程源码详解

    Worker 伴生对象的 main 方法:

      def main(argStrings: Array[String]) {
      Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(
      exitOnUncaughtException = false))
      Utils.initDaemon(log)
      //第一步:解析参数
      //创建conf对象,读取spark默认的配置参数
      val conf = new SparkConf


      //解析程序运行参数,读取以spark. 开头的环境变量
      val args = new WorkerArguments(argStrings, conf)


      //TODO 第二步:启动 Worker 的 RPC 服务端
      val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,
      args.memory, args.masters, args.workDir, conf = conf)


      val externalShuffleServiceEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED)
      val sparkWorkerInstances = scala.sys.env.getOrElse("SPARK_WORKER_INSTANCES", "1").toInt
      require(externalShuffleServiceEnabled == false || sparkWorkerInstances <= 1,
      "Starting multiple workers on one host is failed because we may launch no more than one " +
      "external shuffle service on each host, please set spark.shuffle.service.enabled to " +
      "false or set SPARK_WORKER_INSTANCES to 1 to resolve the conflict.")
      rpcEnv.awaitTermination()
      }

      该方法主要做了两件事:

      • 解析参数:包括配置文件、程序运行参数及环境变量

      • 启动 Worker 的 RPC 服务端

          这里启动 Worker 的 RPC 服务端调用的是 startRpcEnvAndEndpoint 方法,逻辑如下:

        def startRpcEnvAndEndpoint(
        host: String,
        port: Int,
        webUiPort: Int,
        cores: Int,
        memory: Int,
        masterUrls: Array[String],
        workDir: String,
        workerNumber: Option[Int] = None,
        conf: SparkConf = new SparkConf): RpcEnv = {


        val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
        //初始化SecurityManager
        val securityMgr = new SecurityManager(conf)


        //TODO 创建RpcEnv对象
        val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)


        //获取可能存在的多个 Master 的地址
        val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))


        //TODO 创建EndPoint对象并启动,用于网络通信
        // 做两件事:
        // 1.创建Worker对象,调用onStart 方法开启生命周期
        // 2.Worker 对象向 rpcEnv 中的 Dispatcher 对象注册
        rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
        masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))
        rpcEnv
        }

        该方法主要包括三个步骤:

        • 第一步:创建 RpcEnv 对象

        • 第二步:创建 Worker 对象并调用 onStart 方法开启生命周期

        • 第三步:将 Worker 对象注册到 RpcEnv 管理的 Dispatcher 对象中

            其中,第一步和第三步在分析 Master 启动流程时已经进行了详细说明,这里不再赘述。重点看 Worker 对象的初始化及其 onStart 方法

        注意:由于高可用的机制,一个 Worker 可能对应多个 Master,其中只有一个状态是 ALIVE,其它状态都是 STANDBY

        Worker 定义:

          private[deploy] class Worker(
          override val rpcEnv: RpcEnv,
          webUiPort: Int,
          cores: Int,//总核数
          memory: Int,//总的可用内存
          masterRpcAddresses: Array[RpcAddress],//Master地址,可能存在多个
          endpointName: String,
          workDirPath: String = null,//工作目录路径
          val conf: SparkConf,
          val securityMgr: SecurityManager,
          externalShuffleServiceSupplier: Supplier[ExternalShuffleService] = null)
          extends ThreadSafeRpcEndpoint with Logging {


          //设置主机名和端口号
          private val host = rpcEnv.address.host
          private val port = rpcEnv.address.port


          //主机名和端口号合法性验证
          Utils.checkHost(host)
          assert (port > 0)


          //进行发送消息调度的线程池
          private val forwordMessageScheduler =
          ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-forward-message-scheduler")


          //也是一个线程池,完成清理工作的线程池。清理任务执行过程中的jar包,文件等,每隔30分钟执行一次
          private val cleanupThreadExecutor = ExecutionContext.fromExecutorService(
          ThreadUtils.newDaemonSingleThreadExecutor("worker-cleanup-thread"))


          //心跳间隔时间,用 60/4 = 15秒
          private val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 4


          private val INITIAL_REGISTRATION_RETRIES = 6
          private val TOTAL_REGISTRATION_RETRIES = INITIAL_REGISTRATION_RETRIES + 10
          private val FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND = 0.500
          private val REGISTRATION_RETRY_FUZZ_MULTIPLIER = {
          val randomNumberGenerator = new Random(UUID.randomUUID.getMostSignificantBits)
          randomNumberGenerator.nextDouble + FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND
          }
          private val INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS = (math.round(10 *
          REGISTRATION_RETRY_FUZZ_MULTIPLIER))
          private val PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS = (math.round(60
          * REGISTRATION_RETRY_FUZZ_MULTIPLIER))


          private val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", false)


          //cleanup定时任务的周期:30分钟
          private val CLEANUP_INTERVAL_MILLIS =
          conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000


          //应用文件夹/数据的ttl,ttl过期时间为7天,过期后将被清理
          private val APP_DATA_RETENTION_SECONDS =
          conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)


          //是否清理 executor 存在的非 shuffer 产生的文件,默认为true
          private val CLEANUP_NON_SHUFFLE_FILES_ENABLED =
          conf.getBoolean("spark.storage.cleanupFilesAfterExecutorExit", true)


          private val testing: Boolean = sys.props.contains("spark.testing")


          //Master节点的RpcEndpointRef代理对象
          private var master: Option[RpcEndpointRef] = None


          private val preferConfiguredMasterAddress =
          conf.getBoolean("spark.worker.preferConfiguredMasterAddress", false)


          private var masterAddressToConnect: Option[RpcAddress] = None
          private var activeMasterUrl: String = ""
          private[worker] var activeMasterWebUiUrl : String = ""
          private var workerWebUiUrl: String = ""
          private val workerUri = RpcEndpointAddress(rpcEnv.address, endpointName).toString
          //标识该worker节点是否注册成功
          private var registered = false
          private var connected = false
          //生成workerID
          private val workerId = generateWorkerId()
          private val sparkHome =
          if (testing) {
          assert(sys.props.contains("spark.test.home"), "spark.test.home is not set!")
          new File(sys.props("spark.test.home"))
          } else {
          new File(sys.env.get("SPARK_HOME").getOrElse("."))
          }


          //工作目录
          var workDir: File = null


          //当前Worker上完成的Executor的集合
          val finishedExecutors = new LinkedHashMap[String, ExecutorRunner]


          //当前Worker上的Driver集合
          val drivers = new HashMap[String, DriverRunner]


          //当前Worker上executor集合
          val executors = new HashMap[String, ExecutorRunner]


          //当前Worker上完成的Driver集合
          val finishedDrivers = new LinkedHashMap[String, DriverRunner]


          //当前Worker上Application的工作文件夹
          val appDirectories = new HashMap[String, Seq[String]]


          //当前Worker上已完成的Application
          val finishedApps = new HashSet[String]


          val retainedExecutors = conf.getInt("spark.worker.ui.retainedExecutors",
          WorkerWebUI.DEFAULT_RETAINED_EXECUTORS)
          val retainedDrivers = conf.getInt("spark.worker.ui.retainedDrivers",
          WorkerWebUI.DEFAULT_RETAINED_DRIVERS)
          //TODO 启动shuffle服务
          private val shuffleService = if (externalShuffleServiceSupplier != null) {
          externalShuffleServiceSupplier.get()
          } else {
          //默认实现
          new ExternalShuffleService(conf, securityMgr)
          }


          private val publicAddress = {
          val envVar = conf.getenv("SPARK_PUBLIC_DNS")
          if (envVar != null) envVar else host
          }
          private var webUi: WorkerWebUI = null


          private var connectionAttemptCount = 0


          //性能监控的组件
          private val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, securityMgr)
          private val workerSource = new WorkerSource(this)


          val reverseProxy = conf.getBoolean("spark.ui.reverseProxy", false)


          private var registerMasterFutures: Array[JFuture[_]] = null
          private var registrationRetryTimer: Option[JScheduledFuture[_]] = None


          //向Master注册的线程池,由于向Master注册是同步阻塞的,所以线程个数和Master的个数是一致的
          private val registerMasterThreadPool = ThreadUtils.newDaemonCachedThreadPool(
          "worker-register-master-threadpool",
          masterRpcAddresses.length // Make sure we can register with all masters at the same time
          )


          //用于记录资源
          //已使用cpu个数
          var coresUsed = 0
          //已使用内存大小
          var memoryUsed = 0
          ...
          }

          可以看到,Worker 中定义了多个属性。重点看以下几个:

          • cores:cup总个数

          • memory:可用内存

          • masterRpcAddresses:可能存在的多个 Master 地址

          • workDirPath:工作目录路径

          • forwordMessageScheduler:进行发送消息调度的线程池

          • cleanupThreadExecutor:完成清理工作的线程池。清理任务执行过程中上传的 jar 包、文件等,清理周期由参数 spark.worker.cleanup.interval 设置,默认每隔 30 分钟清理一次

          • HEARTBEAT_MILLIS:心跳的间隔时间。计算逻辑是 spark.worker.timeout 4,即 60 4 = 15 秒

          • APP_DATA_RETENTION_SECONDS:应用文件夹/数据的过期时间。由参数 spark.worker.cleanup.appDataTtl 配置,默认为 7 天

          • master:ALIVE 状态的 Master 的代理对象

          • registered:标识该 Worker 是否注册成功

          • drivers:当前 Worker 上的 Driver 集合

          • executors:当前 Worker 上的 Executor 集合

          • shuffleService:shuffle 服务对象。后面会进行单独分析

          • coreUsed:已使用的 CPU 个数

          • memoryUsed:已使用的内存大小

              由于 Worker 是真正执行任务的,所以它会记录自身的可用资源,以便向 Master 报告

          开启 Worker 生命周期:onStart 方法
            override def onStart() {
            //验证该Worker未注册
            assert(!registered)
            logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
            host, port, cores, Utils.megabytesToString(memory)))
            logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
            logInfo("Spark home: " + sparkHome)


            //TODO 第一步:创建当前Worker的工作目录
            createWorkDir()


            //TODO 第二步:启动shuffle服务
            startExternalShuffleService()


            //TODO 第三步:启动WebUI,绑定端口
            webUi = new WorkerWebUI(this, workDir, webUiPort)
            webUi.bind()
            workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort}"


            //TODO 第四步:向Master注册
            registerWithMaster()


            //TODO 第五步:注册指标监控系统并启动
            metricsSystem.registerSource(workerSource)
            metricsSystem.start()
            metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
            }

            onStart 方法主要分为五步:

            • 第一步:创建当前 Worker 的工作目录。也就是在 SPARK_HOME 目录下创建一个 work 目录

            • 第二步:启动 shuffle 服务

            • 第三步:启动 WebUI,绑定端口

            • 第四步:向 Master 注册

            • 第五步:注册指标监控系统并启动

                这里重点分析如何向 Master 进行注册,至于如何启动 shuffle 服务,后面单独分析

            向 Master 注册调用了 registerWithMaster 方法:

              private def registerWithMaster() {


              registrationRetryTimer match {
              case None =>
              //如果还未注册
              registered = false
              //TODO 开启注册,多次重试直到注册成功
              registerMasterFutures = tryRegisterAllMasters()
              connectionAttemptCount = 0


              //如果没有注册成功,就启动一个定时任务,每隔一段时间([5,15]之间的随机整数)重新注册一次
              registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate(
              new Runnable {
              override def run(): Unit = Utils.tryLogNonFatalError {
              Option(self).foreach(_.send(ReregisterWithMaster))
              }
              },
              //[5,15]之间的随机整数
              INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
              INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
              TimeUnit.SECONDS))
              case Some(_) =>
              logInfo("Not spawning another attempt to register with the master, since there is an" +
              " attempt scheduled already.")
              }
              }
              • 如果从未注册过,则启动注册;

              • 如果没有注册成功,则启动一个定时任务,每隔一段时间给自己发送一个 ReregisterWithMaster 类型的消息

              启动注册:tryRegisterAllMasters

                private def tryRegisterAllMasters(): Array[JFuture[_]] = {
                //向每个Master提交注册信息
                masterRpcAddresses.map { masterAddress =>
                //往线程池中提交了一个注册的任务
                registerMasterThreadPool.submit(new Runnable {
                override def run(): Unit = {
                try {
                logInfo("Connecting to master " + masterAddress + "...")


                //获取到MasterEndPointRef代理对象
                val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)


                //发送RegisterWorker注册消息给Master
                sendRegisterMessageToMaster(masterEndpoint)
                } catch {
                case ie: InterruptedException => // Cancelled
                case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
                }
                }
                })
                }
                }

                    该方法会遍历所有 Master 地址,给每个 Master 往 registerMasterThreadPool 线程池中提交一个任务,任务的功能就是向 Master 发送 RegisterWorker 注册消息

                Master 接收注册消息的处理逻辑如下:

                  case RegisterWorker(
                  id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) =>
                  logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
                  workerHost, workerPort, cores, Utils.megabytesToString(memory)))


                  //如果是STANDBY状态的Master
                  if (state == RecoveryState.STANDBY) {


                  //什么都不做,给Worker返回一个StandBy
                  workerRef.send(MasterInStandby)


                  //判断该Worker是否注册过
                  } else if (idToWorker.contains(id)) {


                  //如果注册过了,就返回一个RegisterWorkerFailed消息
                  workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))


                  //如果没有注册过,则完成注册
                  } else {


                  //初始化一个WorkerInfo对象,存储了Worker的信息
                  val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
                  workerRef, workerWebUiUrl)


                  //执行Worker的注册,将Worker信息放入各种数据结构中
                  //如果注册成功
                  if (registerWorker(worker)) {


                  //把Worker信息写到zk集群
                  persistenceEngine.addWorker(worker)


                  //给Worker发送RegisteredWorker注册成功的消息
                  workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))


                  //Worker加入工作,当Worker加入时,可用资源必定增加,那么就将等待执行的Driver,Executor等启动起来执行任务
                  schedule()
                  } else {
                  val workerAddress = worker.endpoint.address
                  logWarning("Worker registration failed. Attempted to re-register worker at same " +
                  "address: " + workerAddress)
                  workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: "
                  + workerAddress))
                  }
                  }
                  逻辑如下:
                  • 如果 Master 的状态为 STANDBY,则给 Worker 返回一个 MasterInStandby 消息,Worker 接收该类型消息什么都不会做
                  • 如果 Master 状态为 ALIVE,但该 Worker 已经注册过,就返回一个 RegisterWorkerFailed 类型的消息,Worker 接收该类型消息后,如果 Worker 已经注册过,则什么都不做,如果没有注册过则记录错误日志
                  • 如果 Master 状态为 ALIVE,且该 Worker 没有注册过,则完成 Worker 注册:
                    • 第一步:将 Worker 信息封装成一个 WorkerInfo 对象
                    • 第二步:进行 Worker 注册,即将 Worker 信息保存到 Master 的各种数据结构中
                    • 第三步:如果注册成功,则将 Worker 信息写到 zk 集群
                    • 第四步:给 Worker 发送 RegisteredWorker 类型的消息
                    • 第五步:调度资源,让等待资源的 Driver 和 Executor 执行任务
                  Worker 接收 RegisteredWorker 类型消息的处理方法是:handleRegisterResponse
                    private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
                    msg match {


                    //接收到注册成功的消息
                    case RegisteredWorker(masterRef, masterWebUiUrl, masterAddress) =>
                    if (preferConfiguredMasterAddress) {
                    logInfo("Successfully registered with master " + masterAddress.toSparkURL)
                    } else {
                    logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
                    }


                    //标记注册成功
                    registered = true


                    //Worker会同时给两个Master发送注册信息,如果Master节点发生变更,那么这里就会更新Master的信息
                    changeMaster(masterRef, masterWebUiUrl, masterAddress)


                    //Worker注册成功后,启动定时任务,每隔15秒执行一次,给自己发送一个SendHeartbeat消息
                    forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
                    override def run(): Unit = Utils.tryLogNonFatalError {
                    self.send(SendHeartbeat)
                    }
                    }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
                    //如果配置了定期清理工作目录,则启动定时任务,延时30分钟后,每隔30分钟执行一次
                    if (CLEANUP_ENABLED) {
                    logInfo(
                    s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
                    forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
                    override def run(): Unit = Utils.tryLogNonFatalError {
                    self.send(WorkDirCleanup)
                    }
                    }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
                    }


                    val execs = executors.values.map { e =>
                    new ExecutorDescription(e.appId, e.execId, e.cores, e.state)
                    }
                    //给Master发送WorkerLatestState类型的消息
                    masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq))


                    ...
                    }
                    }

                    如果接收到注册成功的消息,会执行以下步骤:

                    • 第一步:更新 Worker 的 registered 变量,标记注册成功

                    • 第二步:更新 Master 信息。Worker会同时给两个Master发送注册信息,如果Master节点发生变更,那么这里就会更新Master的信息

                    • 第三步:启动心跳定时任务,每隔 15 秒给自己发送一个 SendHeartbeat 消息,自身接收到该消息,会给 Master 发送 Heartbeat 类型的消息

                    • 第四步:如果开启了定期清理工作目录,则启动定时任务,延时30分钟后,每隔30分钟执行一次清理任务

                    • 第五步:将所有 Executor 封装成 ExecutorDescription 对象,然后向 Master 发送 WorkerLatestState 类型的消息

                        上面向 Master 发送了两种类型的消息:Heartbeat WorkerLatestState ,下面看 Master 是如何处理的

                    Heartbeat :

                      case Heartbeat(workerId, worker) =>


                      //根据WorkerID获取Worker信息
                      idToWorker.get(workerId) match {


                      //如果获取到了则更新最后一次心跳的时间
                      case Some(workerInfo) =>
                      workerInfo.lastHeartbeat = System.currentTimeMillis()


                      //如果没有获取到,说明还没有注册 则给Worker发送ReconnectWorker消息
                      case None =>
                      if (workers.map(_.id).contains(workerId)) {
                      logWarning(s"Got heartbeat from unregistered worker $workerId." +
                      " Asking it to re-register.")
                      worker.send(ReconnectWorker(masterUrl))
                      } else {
                      logWarning(s"Got heartbeat from unregistered worker $workerId." +
                      " This worker was never registered, so ignoring the heartbeat.")
                      }
                      }

                      如果是 Heartbeat 类型的消息,Master 会判断该 Worker 是否注册:

                      • 如果注册了,会更新最后一个心跳的时间

                      • 如果没有注册,会给 Worker 返回一个 ReconnectWorker 类型的消息。Worker 处理该消息的逻辑就是调用上面的 registerWithMaster 方法进行注册

                      WorkerLatestState :

                        case WorkerLatestState(workerId, executors, driverIds) =>
                        //根据workerId获取Worker信息
                        idToWorker.get(workerId) match {
                        //如果获取到了指定的 Worker 信息
                        case Some(worker) =>
                        //遍历所有的Executor信息,看Master保存的Worker信息中是否包含该Executor信息,
                        // 如果不包含,则给Worker发送KillExecutor消息,让其关闭该Executor
                        for (exec <- executors) {
                        val executorMatches = worker.executors.exists {
                        case (_, e) => e.application.id == exec.appId && e.id == exec.execId
                        }
                        if (!executorMatches) {
                        // master doesn't recognize this executor. So just tell worker to kill it.
                        worker.endpoint.send(KillExecutor(masterUrl, exec.appId, exec.execId))
                        }
                        }
                        //遍历所有的Driver信息,看Master保存的Worker信息中是否包含该Driver信息,
                        // 如果不包含,则给Worker发送KillDriver消息,让其关闭该Driver
                        for (driverId <- driverIds) {
                        val driverMatches = worker.drivers.exists { case (id, _) => id == driverId }
                        if (!driverMatches) {
                        // master doesn't recognize this driver. So just tell worker to kill it.
                        worker.endpoint.send(KillDriver(driverId))
                        }
                        }
                        //如果没有获取到则记录日志
                        case None =>
                        logWarning("Worker state from unknown worker: " + workerId)
                        }

                            该分支的作用是检查当前 Worker 上的 Executor 和 Driver 是否和 Master 端记录的一致,如果不一致则关闭 Executor 和 Driver

                        注意:从这里也可以看出,在 Master 端维护的 WorkerInfo 信息中,包含了该 Worker 上运行的所有 Executor 和 Driver 信息

                        最后看一下如果 Worker 注册不成功,如何重新注册。调用的是

                        Worker.reregisterWithMaster 方法

                          private def reregisterWithMaster(): Unit = {
                          Utils.tryOrExit {
                          //更新尝试注册的次数
                          connectionAttemptCount += 1
                          //如果已经注册成功,取消最后一次重新注册的操作
                          if (registered) {
                          cancelLastRegistrationRetry()
                          //如果重试次数小于阈值(默认16)
                          } else if (connectionAttemptCount <= TOTAL_REGISTRATION_RETRIES) {
                          logInfo(s"Retrying connection to master (attempt # $connectionAttemptCount)")


                          master match {
                          case Some(masterRef) =>
                          //如果Worker中保存了Master信息,但是注册失败了,说明失去了和Master的连接,所以代理对象不可用,需要重新创建代理对象
                          if (registerMasterFutures != null) {
                          registerMasterFutures.foreach(_.cancel(true))
                          }
                          //重新获取Master代理对象
                          val masterAddress =
                          if (preferConfiguredMasterAddress) masterAddressToConnect.get else masterRef.address
                          //重新向线程池添加注册Worker的任务
                          registerMasterFutures = Array(registerMasterThreadPool.submit(new Runnable {
                          override def run(): Unit = {
                          try {
                          logInfo("Connecting to master " + masterAddress + "...")
                          val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
                          //给Master发送RegisterWorker消息
                          sendRegisterMessageToMaster(masterEndpoint)
                          } catch {
                          case ie: InterruptedException => // Cancelled
                          case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
                          }
                          }
                          }))
                          case None =>
                          if (registerMasterFutures != null) {
                          registerMasterFutures.foreach(_.cancel(true))
                          }
                          // We are retrying the initial registration
                          //尝试进行注册
                          registerMasterFutures = tryRegisterAllMasters()
                          }
                          // We have exceeded the initial registration retry threshold
                          // All retries from now on should use a higher interval
                          //如果达到了初始的重试次数阈值(默认为6),那么应该加大重试间隔
                          if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) {
                          //取消注册重试计时
                          registrationRetryTimer.foreach(_.cancel(true))
                          //重新设置注册的重试计时,重试间隔增大到[30,90]之间的一个随机数,假设为m
                          //那么延时m秒后,每隔m秒重试一次
                          registrationRetryTimer = Some(
                          forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
                          override def run(): Unit = Utils.tryLogNonFatalError {
                          self.send(ReregisterWithMaster)
                          }
                          }, PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS,
                          PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS,
                          TimeUnit.SECONDS))
                          }
                          } else {
                          //如果超过重试次数阈值,则记录错误日志
                          logError("All masters are unresponsive! Giving up.")
                          System.exit(1)
                          }
                          }
                          }

                          该方法的逻辑是:

                          • 第一步:更新重试注册的计数器

                          • 第二步:如果已经注册成功了,那么取消最后一次的重新注册操作

                          • 第三步:如果超过了注册重试的总次数(默认16次),则记录错误日志

                          • 第四步:如果没有超过注册重试的总次数,判断是否有 Master 代理信息,如果没有,且没有注册失败的信息,则尝试进行初始注册

                          • 第五步:如果有 Master 代理信息,但是注册失败了,说明失去了和 Master 的连接,那么需要重新获取 Master 的代理对象。然后使用重新获取的代理对象,向 Master 发送注册消息(这里是将注册的任务重新添加到管理注册线程的线程池里面)

                          • 第六步:如果注册重试的次数达到了初始的重试次数阈值(默认6次),则重新设置注册的重试计时,增大重试的间隔时间,这个时间是 [30,90] 之前的一个随机整数,假设为m,单位是秒。然后启动定时任务,在延时 m 秒之后,每个 m 秒重新注册一次,直到达到总的重试次数。

                          文章转载自大数据记事本,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                          评论