之前的三篇文章:
Spark2.x精通:Standalone模式Master节点启动源码剖析
Spark2.x精通:Master端循环消息处理源码剖析(一)
Spark2.x精通:Master端循环消息处理源码剖析(二)
分析Standalone模式下Master节点的启动流程及循环消息处理机制,基本把Master代码分析的比较透彻了,下面我们就剖析下Worker节点的启动流程和相关消息处理机制:
源码版本:
Spark2.2.0
1.Worker启动脚本为start-worker.sh,最后日志里面打出来的启动日志:
//启动脚本命令/home/cuadmin/ljs/spark2.2.1/sbin/start-worker.sh/日志中打出来的,最后拼写后的命令usr/jdk64/jdk1.8.0_112/bin/java -cp home/cuadmin/ljs/spark2.2.1/conf/:/home/cuadmin/ljs/spark2.2.1/jars/* -Xmx1g org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://salver158.hadoop.unicom:7077
Worker启动脚本里面的脚本除了加载了一些配置主要就是调用了org.apache.spark.deploy.worker.Worker
这个类,我们直接从Worker.scala文件的的main()开始看:
def main(argStrings: Array[String]) {//同样是初始化了日志对象Utils.initDaemon(log)val conf = new SparkConf//根据用户指定的配置,覆盖默认的配置,如端口、cpu、内存等信息,实例化为argsval args = new WorkerArguments(argStrings, conf)//跟Master节点一样调用startRpcEnvAndEndpoint,创建一个RpcEnv和Endpointval rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,args.memory, args.masters, args.workDir, conf = conf)rpcEnv.awaitTermination()}
2.这个main函数就一行代码比较重要,就是startRpcEnvAndEndpoint()函数的调用,这一行其实跟Master节点基本一致,这里用到了Master的地址、端口,因为后期Worker是要跟Master进行通信的,比如Worker的注册;详细看下他的源代码:
def startRpcEnvAndEndpoint(host: String,port: Int,webUiPort: Int,cores: Int,memory: Int,masterUrls: Array[String],workDir: String,workerNumber: Option[Int] = None,conf: SparkConf = new SparkConf): RpcEnv = {// The LocalSparkCluster runs multiple local sparkWorkerX RPC Environmentsval systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")val securityMgr = new SecurityManager(conf)// 创建rpcEnv和endpoint,注意这里创建endpoint用到了Master的ip和端口信息// 以及Worker自身分配的cpu、内存信息val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))rpcEnv}
3.和Master一样,由于继承了ThreadSafeRpcEndpoint,重写里面的onStart方法,在启动的时候会执行onStart方法,源码如下:
override def onStart() {assert(!registered)logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(host, port, cores, Utils.megabytesToString(memory)))logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")logInfo("Spark home: " + sparkHome)//创建Worker工作目录,是伸缩空间和日志输入的目录路径//这里有三个相关的参数有兴趣可以了解下,控制目录文件大小://spark.executor.logs.rolling.strategy//spark.executor.logs.rolling.maxSize 134217728 #default byte//spark.executor.logs.rolling.maxRetainedFilescreateWorkDir()//如果用户开启Shffle服务,则启动该服务, External shuffle Service是长期存在于NodeManager进程中的一个辅助服务。// 通过该服务来抓取shuffle数据,减少了Executor的压力,// 在Executor GC的时候也不会影响其他Executor的任务运行。//这个服务由参数spark.shuffle.service.enabled控制 默认为false 不开启shuffleService.startIfEnabled()//实例化Worker的UI界面,绑定端口webUi = new WorkerWebUI(this, workDir, webUiPort)webUi.bind()workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort}"//这里比较重要,向Master进行注册registerWithMaster()//监控相关,这里不讲解metricsSystem.registerSource(workerSource)metricsSystem.start()// Attach the worker metrics servlet handler to the web ui after the metrics system is started.metricsSystem.getServletHandlers.foreach(webUi.attachHandler)}
我们先看第18行,启动shuffle服务代码,跳转到最后调用的是ExternalShuffleService.scala中的start()函数:
4.ExternalShuffleService服务启动
Spark系统在运行含shuffle过程的应用时,Executor进程除了运行task,还要负责写shuffle数据,给其他Executor提供shuffle数据。当Executor进程任务过重,导致GC而不能为其他Executor提供shuffle数据时,会影响任务运行。External shuffle Service是长期存在于NodeManager进程中的一个辅助服务。通过该服务来抓取shuffle数据,减少了Executor的压力,在Executor GC的时候也不会影响其他Executor的任务运行。
/** Start the external shuffle service */def start() {require(server == null, "Shuffle server already started")val authEnabled = securityManager.isAuthenticationEnabled()logInfo(s"Starting shuffle service on port $port (auth enabled = $authEnabled)")val bootstraps: Seq[TransportServerBootstrap] =if (authEnabled) {Seq(new AuthServerBootstrap(transportConf, securityManager))} else {Nil}server = transportContext.createServer(port, bootstraps.asJava)masterMetricsSystem.registerSource(shuffleServiceSource)masterMetricsSystem.start()}
5.Worker向Master注册,调用registerWithMaster()函数,代码如下:
private def registerWithMaster() {// onDisconnected may be triggered multiple times, so don't attempt registration// if there are outstanding registration attempts scheduled.registrationRetryTimer match {case None =>registered = false//主要是这行根据master地址和端口,向Master进行注册registerMasterFutures = tryRegisterAllMasters()connectionAttemptCount = 0// 开启一个定时器、如果上面的tryRegisterAllMasters注册失败,那么registered字段就不为TRUE//这里就判断registered字段,重试直到最大次数后,放弃重试.registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate(new Runnable {override def run(): Unit = Utils.tryLogNonFatalError {Option(self).foreach(_.send(ReregisterWithMaster))}},INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,TimeUnit.SECONDS))case Some(_) =>logInfo("Not spawning another attempt to register with the master, since there is an" +" attempt scheduled already.")}}
这里直接看tryRegisterAllMasters()函数:
private def tryRegisterAllMasters(): Array[JFuture[_]] = {//循环该worker的构造参数属性masterRpcAddresses,这是一个存放着RpcAddress的集合//然后使用一个线程池(个数=masterRpcAddresses个数)来多线程注册Master//这里向所有Master进行注册,由Master的代码我们知道,只有Active的Master才会返回消息,其他不返回消息//这个masterRpcAddresses是在startRpcEnvAndEndpoint函数中调用rpcEnv.setupEndpoint的时候//会new Worker传进来的参数masterRpcAddresses.map { masterAddress =>registerMasterThreadPool.submit(new Runnable {override def run(): Unit = {try {logInfo("Connecting to master " + masterAddress + "...")//根据masterEndPoint地址 和名称 向Master进行注册val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)sendRegisterMessageToMaster(masterEndpoint)} catch {case ie: InterruptedException => // Cancelledcase NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)}}})}}
最后就是调用sendRegisterMessageToMaster()向Master发送RegisterWorker消息,这个消息会在Maser.scala文件的receive()函数中进行处理,完成与Master的通信。
private def sendRegisterMessageToMaster(masterEndpoint: RpcEndpointRef): Unit = {// 发送消息 这里参数有唯一的id, 主机名 端口 cpu 内存等信息masterEndpoint.send(RegisterWorker(workerId,host,port,self,cores,memory,workerWebUiUrl,masterEndpoint.address))}
6.Worker里面还有几个比较重要的东西,这里说明:
1).Worker向Master注册完成后,会启动一个定时任务,定时向Master发送心跳,说明自己还活着,主要就是下面这行代码:
changeMaster(masterRef, masterWebUiUrl, masterAddress)forwordMessageScheduler.scheduleAtFixedRate(new Runnable {override def run(): Unit = Utils.tryLogNonFatalError {self.send(SendHeartbeat)}}, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
2).还有一个就是Worker启动时会新建本地工作目录,它会启动一个定时任务去清理本地工作目录,代码如下:
forwordMessageScheduler.scheduleAtFixedRate(new Runnable {override def run(): Unit = Utils.tryLogNonFatalError {self.send(WorkDirCleanup)}}, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
以上两个内容我会在下一篇文章中讲到,这里也提前提一下。
这里Worker的启动基本结束了,总结一下,它主要是进行了Rpc环境的创建和初始化,初始化Worker实例,包含名称、地址、端口、cpu、内存信息,然后向Master进行注册,接下来我们将分析Worker端的消息处理。
如果觉得我的文章能帮到您,请关注微信公众号“大数据开发运维架构”,并转发朋友圈,谢谢支持!!!





