暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Spark2.x精通:Worker节点启动源码剖析

大数据开发运维架构 2020-03-05
168

    之前的三篇文章:

     Spark2.x精通:Standalone模式Master节点启动源码剖析

     Spark2.x精通:Master端循环消息处理源码剖析(一)

     Spark2.x精通:Master端循环消息处理源码剖析(二)

    分析Standalone模式下Master节点的启动流程及循环消息处理机制,基本把Master代码分析的比较透彻了,下面我们就剖析下Worker节点的启动流程和相关消息处理机制:

源码版本:

    Spark2.2.0

1.Worker启动脚本为start-worker.sh,最后日志里面打出来的启动日志

    //启动脚本命令
    /home/cuadmin/ljs/spark2.2.1/sbin/start-worker.sh
    /日志中打出来的,最后拼写后的命令
    usr/jdk64/jdk1.8.0_112/bin/java -cp home/cuadmin/ljs/spark2.2.1/conf/:/home/cuadmin/ljs/spark2.2.1/jars/* -Xmx1g org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://salver158.hadoop.unicom:7077

     

        Worker启动脚本里面的脚本除了加载了一些配置主要就是调用了org.apache.spark.deploy.worker.Worker
    这个类,我们直接从Worker.scala文件的的main()开始看:

      def main(argStrings: Array[String]) {
          //同样是初始化了日志对象
      Utils.initDaemon(log)
      val conf = new SparkConf
          //根据用户指定的配置,覆盖默认的配置,如端口、cpu、内存等信息,实例化为args
      val args = new WorkerArguments(argStrings, conf)
          //跟Master节点一样调用startRpcEnvAndEndpoint,创建一个RpcEnv和Endpoint
      val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,
      args.memory, args.masters, args.workDir, conf = conf)
      rpcEnv.awaitTermination()
      }

         

      2.这个main函数就一行代码比较重要,就是startRpcEnvAndEndpoint()函数的调用,这一行其实跟Master节点基本一致,这里用到了Master的地址、端口,因为后期Worker是要跟Master进行通信的,比如Worker的注册;详细看下他的源代码:

        def startRpcEnvAndEndpoint(
        host: String,
        port: Int,
        webUiPort: Int,
        cores: Int,
        memory: Int,
        masterUrls: Array[String],
        workDir: String,
        workerNumber: Option[Int] = None,
        conf: SparkConf = new SparkConf): RpcEnv = {

        // The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
        val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
        val securityMgr = new SecurityManager(conf)
            // 创建rpcEnv和endpoint,注意这里创建endpoint用到了Master的ip和端口信息
            // 以及Worker自身分配的cpu、内存信息
        val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
        val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
        rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
        masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))
        rpcEnv
        }

        3.和Master一样,由于继承了ThreadSafeRpcEndpoint,重写里面的onStart方法,在启动的时候会执行onStart方法,源码如下:

          override def onStart() {
          assert(!registered)
          logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
          host, port, cores, Utils.megabytesToString(memory)))
          logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
          logInfo("Spark home: " + sparkHome)
             
              //创建Worker工作目录,是伸缩空间和日志输入的目录路径
              //这里有三个相关的参数有兴趣可以了解下,控制目录文件大小:
              //spark.executor.logs.rolling.strategy 
              //spark.executor.logs.rolling.maxSize 134217728 #default byte 
              //spark.executor.logs.rolling.maxRetainedFiles
          createWorkDir()
              //如果用户开启Shffle服务,则启动该服务,   External shuffle Service是长期存在于NodeManager进程中的一个辅助服务。
              // 通过该服务来抓取shuffle数据,减少了Executor的压力,
              // 在Executor GC的时候也不会影响其他Executor的任务运行。
              //这个服务由参数spark.shuffle.service.enabled控制 默认为false 不开启
          shuffleService.startIfEnabled()
             //实例化Worker的UI界面,绑定端口
          webUi = new WorkerWebUI(this, workDir, webUiPort)
              webUi.bind()
              
          workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort}"
              //这里比较重要,向Master进行注册
              registerWithMaster()
              //监控相关,这里不讲解
          metricsSystem.registerSource(workerSource)
          metricsSystem.start()
          // Attach the worker metrics servlet handler to the web ui after the metrics system is started.
          metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
            }

              我们先看第18行,启动shuffle服务代码,跳转到最后调用的是ExternalShuffleService.scala中的start()函数:

          4.ExternalShuffleService服务启动

               Spark系统在运行含shuffle过程的应用时,Executor进程除了运行task,还要负责写shuffle数据,给其他Executor提供shuffle数据。当Executor进程任务过重,导致GC而不能为其他Executor提供shuffle数据时,会影响任务运行。External shuffle Service是长期存在于NodeManager进程中的一个辅助服务。通过该服务来抓取shuffle数据,减少了Executor的压力,在Executor GC的时候也不会影响其他Executor的任务运行。

            /** Start the external shuffle service */
            def start() {
            require(server == null, "Shuffle server already started")
            val authEnabled = securityManager.isAuthenticationEnabled()
            logInfo(s"Starting shuffle service on port $port (auth enabled = $authEnabled)")
            val bootstraps: Seq[TransportServerBootstrap] =
            if (authEnabled) {
            Seq(new AuthServerBootstrap(transportConf, securityManager))
            } else {
            Nil
            }
            server = transportContext.createServer(port, bootstraps.asJava)

            masterMetricsSystem.registerSource(shuffleServiceSource)
            masterMetricsSystem.start()
            }

            5.Worker向Master注册,调用registerWithMaster()函数,代码如下

              private def registerWithMaster() {
              // onDisconnected may be triggered multiple times, so don't attempt registration
              // if there are outstanding registration attempts scheduled.
              registrationRetryTimer match {
              case None =>
              registered = false
                      //主要是这行根据master地址和端口,向Master进行注册
              registerMasterFutures = tryRegisterAllMasters()
              connectionAttemptCount = 0
              // 开启一个定时器、如果上面的tryRegisterAllMasters注册失败,那么registered字段就不为TRUE
              //这里就判断registered字段,重试直到最大次数后,放弃重试.
              registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate(
              new Runnable {
              override def run(): Unit = Utils.tryLogNonFatalError {
              Option(self).foreach(_.send(ReregisterWithMaster))
              }
              },
              INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
              INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
              TimeUnit.SECONDS))
              case Some(_) =>
              logInfo("Not spawning another attempt to register with the master, since there is an" +
              " attempt scheduled already.")
              }
              }

                  这里直接看tryRegisterAllMasters()函数:

                private def tryRegisterAllMasters(): Array[JFuture[_]] = {
                //循环该worker的构造参数属性masterRpcAddresses,这是一个存放着RpcAddress的集合
                //然后使用一个线程池(个数=masterRpcAddresses个数)来多线程注册Master
                    //这里向所有Master进行注册,由Master的代码我们知道,只有Active的Master才会返回消息,其他不返回消息
                    //这个masterRpcAddresses是在startRpcEnvAndEndpoint函数中调用rpcEnv.setupEndpoint的时候
                    //会new Worker传进来的参数
                    masterRpcAddresses.map { masterAddress =>
                registerMasterThreadPool.submit(new Runnable {
                override def run(): Unit = {
                try {
                logInfo("Connecting to master " + masterAddress + "...")
                            //根据masterEndPoint地址 和名称 向Master进行注册
                           val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
                sendRegisterMessageToMaster(masterEndpoint)
                } catch {
                case ie: InterruptedException => // Cancelled
                case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
                }
                }
                })
                }
                }

                    

                    最后就是调用sendRegisterMessageToMaster()向Master发送RegisterWorker消息,这个消息会在Maser.scala文件的receive()函数中进行处理,完成与Master的通信。

                   private def sendRegisterMessageToMaster(masterEndpoint: RpcEndpointRef): Unit = {
                     // 发送消息 这里参数有唯一的id, 主机名 端口 cpu 内存等信息
                      masterEndpoint.send(RegisterWorker(
                  workerId,
                  host,
                  port,
                  self,
                  cores,
                  memory,
                  workerWebUiUrl,
                  masterEndpoint.address))
                  }

                     

                  6.Worker里面还有几个比较重要的东西,这里说明:

                      1).Worker向Master注册完成后,会启动一个定时任务,定时向Master发送心跳,说明自己还活着,主要就是下面这行代码:

                     changeMaster(masterRef, masterWebUiUrl, masterAddress)
                    forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
                    override def run(): Unit = Utils.tryLogNonFatalError {
                    self.send(SendHeartbeat)
                    }
                    }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)

                        2).还有一个就是Worker启动时会新建本地工作目录,它会启动一个定时任务去清理本地工作目录,代码如下:

                       forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
                      override def run(): Unit = Utils.tryLogNonFatalError {
                      self.send(WorkDirCleanup)
                      }
                      }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)

                         

                          以上两个内容我会在下一篇文章中讲到,这里也提前提一下。


                           

                          这里Worker的启动基本结束了,总结一下,它主要是进行了Rpc环境的创建和初始化,初始化Worker实例,包含名称、地址、端口、cpu、内存信息,然后向Master进行注册,接下来我们将分析Worker端的消息处理。


                          如果觉得我的文章能帮到您,请关注微信公众号“大数据开发运维架构”,并转发朋友圈,谢谢支持!


                      文章转载自大数据开发运维架构,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                      评论