本文主要介绍Executor的创建和任务的执行过程,通过创建Executor和在executor中执行任务。使用线程池来执行任务。

01
—
Executor启动向driver端进行注册
1.类CoarseGrainedExecutorBackend中的onStart方法,向driver端进行注册
override def onStart(): Unit = {if (env.conf.get(DECOMMISSION_ENABLED)) {logInfo("Registering PWR handler to trigger decommissioning.")SignalUtils.register("PWR", "Failed to register SIGPWR handler - " +"disabling executor decommission feature.") (self.askSync[Boolean](ExecutorSigPWRReceived))}logInfo("Connecting to driver: " + driverUrl)try {_resources = parseOrFindResources(resourcesFileOpt)} catch {case NonFatal(e) =>exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)}rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>// This is a very fast action so we can use "ThreadUtils.sameThread"driver = Some(ref)ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,extractAttributes, _resources, resourceProfile.id))//发送消息RegisterExecutor向Driver进程注册,//这里注册人RegisterExecutor与Executor没有任何关系,//实际注册的是ExecutorBackend,Executor实际//只是RegisterExecutor的一个对象。}(ThreadUtils.sameThread).onComplete {case Success(_) =>self.send(RegisteredExecutor)case Failure(e) =>exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)}(ThreadUtils.sameThread)}
// Executors to drivercase class RegisterExecutor(executorId: String,executorRef: RpcEndpointRef,hostname: String,cores: Int,logUrls: Map[String, String],attributes: Map[String, String],resources: Map[String, ResourceInformation],resourceProfileId: Int)extends CoarseGrainedClusterMessage
2.CoarseGrainedClusterMessage是一个trait
private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable
3.CoarseGrainedExecutorBackend是Executor运行所在的进程名称,Executor才是正真处理Task的对象,Executor内部是通过线程池的方式来完成Task的计算的,这里有main方法,就会创建一个进程。
def main(args: Array[String]): Unit = {val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath.toSeq,env, arguments.resourcesFileOpt, resourceProfile)}run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn)System.exit(0)}
4.CoarseGrainedExecutorBackend和Executor是一 一对相应的
private[spark] class CoarseGrainedExecutorBackend(override val rpcEnv: RpcEnv,driverUrl: String,executorId: String,bindAddress: String,hostname: String,cores: Int,userClassPath: Seq[URL],env: SparkEnv,resourcesFileOpt: Option[String],resourceProfile: ResourceProfile)extends IsolatedRpcEndpoint with ExecutorBackend with Logging {
5.CoarseGrainedExecutorBackend是一个消息通信体(其实具体实现了IsolatedRpcEndpoint),可以发送消息给Driver并可以接收Driver中发过来的指令,例如启动Task等。
02
—
Driver进程接收消息
1.在Driver进程有两个重要的EndPoint:
(1)ClientEndpoint:主要负责向Master注册当前程序,是AppClient的内部成员;
AppClient类的内部类ClientEndpoint
private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging
(2)DriverEndpoint:这是整个程序运行时候的驱动器,是CoarseGrainedExecutorBackend的内部成员。
类private[spark]
class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv)
extends ExecutorAllocationClient with SchedulerBackend with Logging
的内部类DriverEndpoint
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging
2.在Driver中通过ExecutorData封装并注册ExecutorBackend的信息到Driver的内存数据结构executorMapData中:
3.实际在执行的时候DriverEndpoint会把信息写入CoarseGrainedSchedulerBackend的内存数据结构executorMapData,所以最终是注册给了CoarseGrainedSchedulerBackend,也就是说CoarseGrainedSchedulerBackend掌握了为当前程序分配所有ExecutorBackend进程,而在每一个ExecutorBackend进行实例中会通过Executor对象来负责具体Task的运行。在运行的时候使用synchronized关键字保证executorMapData安全的并发写操作。
4.CoarseGrainedExecutorBackend收到DriverEndpoin发送来的RegisteredExecutor消息会启动Executor实例对象,而Executor实例对象事实上负责整个Task计算
03
—
Executor具体如何工作的
1.当Driver发送来Task的时候,其实发送给CoarseGrainedExecutorBackend这个RPCEndpoint,而不是直接发送给Executor(Executor由于不是消息循环体,所以永远也无法直接接收远程发过来的消息)
2.ExecutorBackend在收到Driver中发送过来的消息后会提供调用launchTask来交给Executor去执行
04
—
Worker中的Executor启动过程
1.实际在执行的时候DriverEndpoint会把信息写入CoarseGrainedSchedulerBackend的内存数据结构executorMapData,所以最终是注册给了CoarseGrainedSchedulerBackend,也就是说CoarseGrainedSchedulerBackend掌握了为当前程序分配所有ExecutorBackend进程,而在每一个ExecutorBackend进行实例中会通过Executor对象来负责具体Task的运行。在运行的时候使用synchronized关键字保证executorMapData安全的并发写操作。
类CoarseGrainedSchedulerBackend接收Executor消息方法
RegisterExecutor
创建了new ExecutorData对象
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls,attributes, resources, resourceProfileId) =>if (executorDataMap.contains(executorId)) {context.sendFailure(new IllegalStateException(s"Duplicate executor ID: $executorId"))} else if (scheduler.excludedNodes.contains(hostname) ||isExecutorExcluded(executorId, hostname)) {// If the cluster manager gives us an executor on an excluded node (because it// already started allocating those resources before we informed it of our exclusion,// or if it ignored our exclusion), then we reject that executor immediately.logInfo(s"Rejecting $executorId as it has been excluded.")context.sendFailure(new IllegalStateException(s"Executor is excluded due to failures: $executorId"))} else {// If the executor's rpc env is not listening for incoming connections, `hostPort`// will be null, and the client connection should be used to contact the executor.val executorAddress = if (executorRef.address != null) {executorRef.address} else {context.senderAddress}logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId, " +s" ResourceProfileId $resourceProfileId")addressToExecutorId(executorAddress) = executorIdtotalCoreCount.addAndGet(cores)totalRegisteredExecutors.addAndGet(1)val resourcesInfo = resources.map { case (rName, info) =>// tell the executor it can schedule resources up to numSlotsPerAddress times,// as configured by the user, or set to 1 as that is the default (1 task/resource)val numParts = scheduler.sc.resourceProfileManager.resourceProfileFromId(resourceProfileId).getNumSlotsPerAddress(rName, conf)(info.name, new ExecutorResourceInfo(info.name, info.addresses, numParts))}val data = new ExecutorData(executorRef, executorAddress, hostname,0, cores, logUrlHandler.applyPattern(logUrls, attributes), attributes,resourcesInfo, resourceProfileId, registrationTs = System.currentTimeMillis())// This must be synchronized because variables mutated// in this block are read when requesting executors//集群中有许多人进行注册,防止写冲突,CoarseGrainedSchedulerBackend.this.synchronized {//将创建ExecutorData对象,放入HashMap内存数据结构中executorDataMap.put(executorId, data)if (currentExecutorIdCounter < executorId.toInt) {currentExecutorIdCounter = executorId.toInt}}listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))// Note: some tests expect the reply to come after we put the executor in the mapcontext.reply(true)}case StopDriver =>context.reply(true)stop()case StopExecutors =>logInfo("Asking each executor to shut down")for ((_, executorData) <- executorDataMap) {executorData.executorEndpoint.send(StopExecutor)}context.reply(true)case RemoveWorker(workerId, host, message) =>removeWorker(workerId, host, message)context.reply(true)// Do not change this code without running the K8s integration suitescase ExecutorDecommissioning(executorId) =>logWarning(s"Received executor $executorId decommissioned message")context.reply(decommissionExecutor(executorId,ExecutorDecommissionInfo(s"Executor $executorId is decommissioned."),adjustTargetNumExecutors = false,triggeredByExecutor = true))case RetrieveSparkAppConfig(resourceProfileId) =>val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(resourceProfileId)val reply = SparkAppConfig(sparkProperties,SparkEnv.get.securityManager.getIOEncryptionKey(),Option(delegationTokens.get()),rp)context.reply(reply)case IsExecutorAlive(executorId) => context.reply(isExecutorActive(executorId))case e =>logError(s"Received unexpected ask ${e}")}
(1.1)executorDataMap
private val executorDataMap = new HashMap[String, ExecutorData]
(1.2)
/*** Grouping of data for an executor used by CoarseGrainedSchedulerBackend.** @param executorEndpoint The RpcEndpointRef representing this executor* @param executorAddress The network address of this executor* @param executorHost The hostname that this executor is running on* @param freeCores The current number of cores available for work on the executor* @param totalCores The total number of cores available to the executor* @param resourcesInfo The information of the currently available resources on the executor* @param resourceProfileId The id of the ResourceProfile being used by this executor* @param registrationTs The registration timestamp of this executor*/private[cluster] class ExecutorData(val executorEndpoint: RpcEndpointRef,val executorAddress: RpcAddress,override val executorHost: String,var freeCores: Int,override val totalCores: Int,override val logUrlMap: Map[String, String],override val attributes: Map[String, String],override val resourcesInfo: Map[String, ExecutorResourceInfo],override val resourceProfileId: Int,val registrationTs: Long) extends ExecutorInfo(executorHost, totalCores, logUrlMap, attributes,resourcesInfo, resourceProfileId)
2.CoarseGrainedExecutorBackend收到DriverEndpoin发送来的RegisteredExecutor消息会启动Executor实例对象,而Executor实例对象事实上负责整个Task计算
override def receive: PartialFunction[Any, Unit] = {case RegisteredExecutor =>logInfo("Successfully registered with driver")try {executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,resources = _resources)driver.get.send(LaunchedExecutor(executorId))} catch {case NonFatal(e) =>exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)}
(2.1)Executor数据结构
通过线程池运行tasks
/*** Spark executor, backed by a threadpool to run tasks.** This can be used with Mesos, YARN, and the standalone scheduler.* An internal RPC interface is used for communication with the driver,* except in the case of Mesos fine-grained mode.*/private[spark] class Executor(executorId: String,executorHostname: String,env: SparkEnv,userClassPath: Seq[URL] = Nil,isLocal: Boolean = false,uncaughtExceptionHandler: UncaughtExceptionHandler = new SparkUncaughtExceptionHandler,resources: immutable.Map[String, ResourceInformation])extends Logging {// Start worker thread pool// Use UninterruptibleThread to run tasks so that we can allow running codes without being// interrupted by `Thread.interrupt()`. Some issues, such as KAFKA-1894, HADOOP-10622,// will hang forever if some methods are interrupted.private val threadPool = {val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Executor task launch worker-%d").setThreadFactory((r: Runnable) => new UninterruptibleThread(r, "unused")).build()Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]}
05
—
CoarseGrainedExecutorBackend类接收LaunchTask任务
1.CoarseGrainedExecutorBackend类中LaunchTask
case LaunchTask(data) =>if (executor == null) {exitExecutor(1, "Received LaunchTask command but executor was null")} else {val taskDesc = TaskDescription.decode(data.value)logInfo("Got assigned task " + taskDesc.taskId)taskResources(taskDesc.taskId) = taskDesc.resourcesexecutor.launchTask(this, taskDesc)}
(1.2)这里进行new TaskRunner
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {val tr = new TaskRunner(context, taskDescription, plugins)runningTasks.put(taskDescription.taskId, tr)threadPool.execute(tr)if (decommissioned) {log.error(s"Launching a task while in decommissioned state.")}}
(1.3)TaskRunner实现了Runnable进行创建线程,完成后由线程池来执行线程
class TaskRunner(execBackend: ExecutorBackend,private val taskDescription: TaskDescription,private val plugins: Option[PluginContainer])extends Runnable {override def run(): Unit = {val value = Utils.tryWithSafeFinally {val res = task.run(taskAttemptId = taskId,attemptNumber = taskDescription.attemptNumber,metricsSystem = env.metricsSystem,resources = taskDescription.resources,plugins = plugins)threwException = falseres
/*** Called by [[org.apache.spark.executor.Executor]] to run this task.** @param taskAttemptId an identifier for this task attempt that is unique within a SparkContext.* @param attemptNumber how many times this task has been attempted (0 for the first attempt)* @param resources other host resources (like gpus) that this task attempt can access* @return the result of the task along with updates of Accumulators.*/final def run(taskAttemptId: Long,attemptNumber: Int,metricsSystem: MetricsSystem,resources: Map[String, ResourceInformation],plugins: Option[PluginContainer]): T = {SparkEnv.get.blockManager.registerTask(taskAttemptId)// TODO SPARK-24874 Allow create BarrierTaskContext based on partitions, instead of whether// the stage is barrier.val taskContext = new TaskContextImpl(stageId,stageAttemptId, // stageAttemptId and stageAttemptNumber are semantically equalpartitionId,taskAttemptId,attemptNumber,taskMemoryManager,localProperties,metricsSystem,metrics,resources)context = if (isBarrier) {new BarrierTaskContext(taskContext)} else {taskContext}InputFileBlockHolder.initialize()TaskContext.setTaskContext(context)taskThread = Thread.currentThread()if (_reasonIfKilled != null) {kill(interruptThread = false, _reasonIfKilled)}new CallerContext("TASK",SparkEnv.get.conf.get(APP_CALLER_CONTEXT),appId,appAttemptId,jobId,Option(stageId),Option(stageAttemptId),Option(taskAttemptId),Option(attemptNumber)).setCurrentContext()plugins.foreach(_.onTaskStart())try {runTask(context)} catch {case e: Throwable =>// Catch all errors; run task failure callbacks, and rethrow the exception.try {context.markTaskFailed(e)} catch {case t: Throwable =>e.addSuppressed(t)}context.markTaskCompleted(Some(e))throw e} finally {try {// Call the task completion callbacks. If "markTaskCompleted" is called twice, the second// one is no-op.context.markTaskCompleted(None)} finally {try {Utils.tryLogNonFatalError {// Release memory used by this thread for unrolling blocksSparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP)SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.OFF_HEAP)// Notify any tasks waiting for execution memory to be freed to wake up and try to// acquire memory again. This makes impossible the scenario where a task sleeps forever// because there are no other tasks left to notify it. Since this is safe to do but may// not be strictly necessary, we should revisit whether we can remove this in the// future.val memoryManager = SparkEnv.get.memoryManagermemoryManager.synchronized { memoryManager.notifyAll() }}} finally {// Though we unset the ThreadLocal here, the context member variable itself is still// queried directly in the TaskRunner to check for FetchFailedExceptions.TaskContext.unset()InputFileBlockHolder.unset()}}}}
runTask的方法调用的是TaskContext对象
def runTask(context: TaskContext): T
Task是一个抽象类,真正调用的是ShuffleMapTask或者ResulTask
/*** A unit of execution. We have two kinds of Task's in Spark:** - [[org.apache.spark.scheduler.ShuffleMapTask]]* - [[org.apache.spark.scheduler.ResultTask]]
感谢大家阅读,希望点赞、收藏、关注。
奇迹的出现往往就在再检查一下的时候!




