暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

spark源码解析(9)-Executor工作原理

beenrun 2022-03-13
692

本文主要介绍Executor的创建和任务的执行过程,通过创建Executor和在executor中执行任务。使用线程池来执行任务。




01


Executor启动向driver端进行注册

1.类CoarseGrainedExecutorBackend中的onStart方法,向driver端进行注册

    override def onStart(): Unit = {
    if (env.conf.get(DECOMMISSION_ENABLED)) {
    logInfo("Registering PWR handler to trigger decommissioning.")
    SignalUtils.register("PWR", "Failed to register SIGPWR handler - " +
    "disabling executor decommission feature.") (self.askSync[Boolean](ExecutorSigPWRReceived))
    }


    logInfo("Connecting to driver: " + driverUrl)
    try {
    _resources = parseOrFindResources(resourcesFileOpt)
    } catch {
    case NonFatal(e) =>
    exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
    }
    rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
    // This is a very fast action so we can use "ThreadUtils.sameThread"
    driver = Some(ref)
    ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
            extractAttributes, _resources, resourceProfile.id))
            //发送消息RegisterExecutor向Driver进程注册,
            //这里注册人RegisterExecutor与Executor没有任何关系,
            //实际注册的是ExecutorBackend,Executor实际
            //只是RegisterExecutor的一个对象。
    }(ThreadUtils.sameThread).onComplete {
    case Success(_) =>
    self.send(RegisteredExecutor)
    case Failure(e) =>
    exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
    }(ThreadUtils.sameThread)
    }


        // Executors to driver
      case class RegisterExecutor(
      executorId: String,
      executorRef: RpcEndpointRef,
      hostname: String,
      cores: Int,
      logUrls: Map[String, String],
      attributes: Map[String, String],
      resources: Map[String, ResourceInformation],
      resourceProfileId: Int)
      extends CoarseGrainedClusterMessage


      2.CoarseGrainedClusterMessage是一个trait

        private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable


        3.CoarseGrainedExecutorBackend是Executor运行所在的进程名称,Executor才是正真处理Task的对象,Executor内部是通过线程池的方式来完成Task的计算的,这里有main方法,就会创建一个进程。

            def main(args: Array[String]): Unit = {
          val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>
          CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>
          new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
          arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath.toSeq,
          env, arguments.resourcesFileOpt, resourceProfile)
          }
          run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn)
          System.exit(0)
          }


          4.CoarseGrainedExecutorBackend和Executor是一 一对相应的

            private[spark] class CoarseGrainedExecutorBackend(
            override val rpcEnv: RpcEnv,
            driverUrl: String,
            executorId: String,
            bindAddress: String,
            hostname: String,
            cores: Int,
            userClassPath: Seq[URL],
            env: SparkEnv,
            resourcesFileOpt: Option[String],
            resourceProfile: ResourceProfile)
            extends IsolatedRpcEndpoint with ExecutorBackend with Logging {


            5.CoarseGrainedExecutorBackend是一个消息通信体(其实具体实现了IsolatedRpcEndpoint),可以发送消息给Driver并可以接收Driver中发过来的指令,例如启动Task等。




            02


            Driver进程接收消息


            1.在Driver进程有两个重要的EndPoint:

                 (1)ClientEndpoint:主要负责向Master注册当前程序,是AppClient的内部成员;

                 AppClient类的内部类ClientEndpoint

                 private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint   with Logging 

               (2)DriverEndpoint:这是整个程序运行时候的驱动器,是CoarseGrainedExecutorBackend的内部成员。

            类private[spark]

            class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv)

              extends ExecutorAllocationClient with SchedulerBackend with Logging

            的内部类DriverEndpoint

              class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])

                extends ThreadSafeRpcEndpoint with Logging


            2.在Driver中通过ExecutorData封装并注册ExecutorBackend的信息到Driver的内存数据结构executorMapData中:


            3.实际在执行的时候DriverEndpoint会把信息写入CoarseGrainedSchedulerBackend的内存数据结构executorMapData,所以最终是注册给了CoarseGrainedSchedulerBackend,也就是说CoarseGrainedSchedulerBackend掌握了为当前程序分配所有ExecutorBackend进程,而在每一个ExecutorBackend进行实例中会通过Executor对象来负责具体Task的运行。在运行的时候使用synchronized关键字保证executorMapData安全的并发写操作。


            4.CoarseGrainedExecutorBackend收到DriverEndpoin发送来的RegisteredExecutor消息会启动Executor实例对象,而Executor实例对象事实上负责整个Task计算


            03


            Executor具体如何工作的

             1.当Driver发送来Task的时候,其实发送给CoarseGrainedExecutorBackend这个RPCEndpoint,而不是直接发送给Executor(Executor由于不是消息循环体,所以永远也无法直接接收远程发过来的消息)


            2.ExecutorBackend在收到Driver中发送过来的消息后会提供调用launchTask来交给Executor去执行


            04


            Worker中的Executor启动过程

             1.实际在执行的时候DriverEndpoint会把信息写入CoarseGrainedSchedulerBackend的内存数据结构executorMapData,所以最终是注册给了CoarseGrainedSchedulerBackend,也就是说CoarseGrainedSchedulerBackend掌握了为当前程序分配所有ExecutorBackend进程,而在每一个ExecutorBackend进行实例中会通过Executor对象来负责具体Task的运行。在运行的时候使用synchronized关键字保证executorMapData安全的并发写操作。

            CoarseGrainedSchedulerBackend接收Executor消息方法

            RegisterExecutor

            创建了new ExecutorData对象


                  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
                    case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls,
              attributes, resources, resourceProfileId) =>
              if (executorDataMap.contains(executorId)) {
              context.sendFailure(new IllegalStateException(s"Duplicate executor ID: $executorId"))
              } else if (scheduler.excludedNodes.contains(hostname) ||
              isExecutorExcluded(executorId, hostname)) {
              // If the cluster manager gives us an executor on an excluded node (because it
              // already started allocating those resources before we informed it of our exclusion,
              // or if it ignored our exclusion), then we reject that executor immediately.
              logInfo(s"Rejecting $executorId as it has been excluded.")
              context.sendFailure(
              new IllegalStateException(s"Executor is excluded due to failures: $executorId"))
              } else {
              // If the executor's rpc env is not listening for incoming connections, `hostPort`
              // will be null, and the client connection should be used to contact the executor.
              val executorAddress = if (executorRef.address != null) {
              executorRef.address
              } else {
              context.senderAddress
              }
              logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId, " +
              s" ResourceProfileId $resourceProfileId")
              addressToExecutorId(executorAddress) = executorId
              totalCoreCount.addAndGet(cores)
              totalRegisteredExecutors.addAndGet(1)
              val resourcesInfo = resources.map { case (rName, info) =>
              // tell the executor it can schedule resources up to numSlotsPerAddress times,
              // as configured by the user, or set to 1 as that is the default (1 task/resource)
              val numParts = scheduler.sc.resourceProfileManager
              .resourceProfileFromId(resourceProfileId).getNumSlotsPerAddress(rName, conf)
              (info.name, new ExecutorResourceInfo(info.name, info.addresses, numParts))
              }
              val data = new ExecutorData(executorRef, executorAddress, hostname,
              0, cores, logUrlHandler.applyPattern(logUrls, attributes), attributes,
              resourcesInfo, resourceProfileId, registrationTs = System.currentTimeMillis())
              // This must be synchronized because variables mutated
              // in this block are read when requesting executors
                        //集群中有许多人进行注册,防止写冲突,
              CoarseGrainedSchedulerBackend.this.synchronized {
                        //将创建ExecutorData对象,放入HashMap内存数据结构中
              executorDataMap.put(executorId, data)
              if (currentExecutorIdCounter < executorId.toInt) {
              currentExecutorIdCounter = executorId.toInt
              }
              }
              listenerBus.post(
              SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
              // Note: some tests expect the reply to come after we put the executor in the map
              context.reply(true)
              }


              case StopDriver =>
              context.reply(true)
              stop()


              case StopExecutors =>
              logInfo("Asking each executor to shut down")
              for ((_, executorData) <- executorDataMap) {
              executorData.executorEndpoint.send(StopExecutor)
              }
              context.reply(true)


              case RemoveWorker(workerId, host, message) =>
              removeWorker(workerId, host, message)
              context.reply(true)


              // Do not change this code without running the K8s integration suites
              case ExecutorDecommissioning(executorId) =>
              logWarning(s"Received executor $executorId decommissioned message")
              context.reply(
              decommissionExecutor(
              executorId,
              ExecutorDecommissionInfo(s"Executor $executorId is decommissioned."),
              adjustTargetNumExecutors = false,
              triggeredByExecutor = true))


              case RetrieveSparkAppConfig(resourceProfileId) =>
              val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(resourceProfileId)
              val reply = SparkAppConfig(
              sparkProperties,
              SparkEnv.get.securityManager.getIOEncryptionKey(),
              Option(delegationTokens.get()),
              rp)
              context.reply(reply)


              case IsExecutorAlive(executorId) => context.reply(isExecutorActive(executorId))


              case e =>
              logError(s"Received unexpected ask ${e}")
              }


              (1.1)executorDataMap

                private val executorDataMap = new HashMap[String, ExecutorData]

                (1.2)

                  /**
                  * Grouping of data for an executor used by CoarseGrainedSchedulerBackend.
                  *
                  * @param executorEndpoint The RpcEndpointRef representing this executor
                  * @param executorAddress The network address of this executor
                  * @param executorHost The hostname that this executor is running on
                  * @param freeCores The current number of cores available for work on the executor
                  * @param totalCores The total number of cores available to the executor
                  * @param resourcesInfo The information of the currently available resources on the executor
                  * @param resourceProfileId The id of the ResourceProfile being used by this executor
                  * @param registrationTs The registration timestamp of this executor
                  */
                  private[cluster] class ExecutorData(
                  val executorEndpoint: RpcEndpointRef,
                  val executorAddress: RpcAddress,
                  override val executorHost: String,
                  var freeCores: Int,
                  override val totalCores: Int,
                  override val logUrlMap: Map[String, String],
                  override val attributes: Map[String, String],
                  override val resourcesInfo: Map[String, ExecutorResourceInfo],
                  override val resourceProfileId: Int,
                  val registrationTs: Long
                  ) extends ExecutorInfo(executorHost, totalCores, logUrlMap, attributes,
                  resourcesInfo, resourceProfileId)




                  2.CoarseGrainedExecutorBackend收到DriverEndpoin发送来的RegisteredExecutor消息会启动Executor实例对象,而Executor实例对象事实上负责整个Task计算

                      override def receive: PartialFunction[Any, Unit] = {
                    case RegisteredExecutor =>
                    logInfo("Successfully registered with driver")
                    try {
                    executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,
                    resources = _resources)
                    driver.get.send(LaunchedExecutor(executorId))
                    } catch {
                    case NonFatal(e) =>
                    exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
                    }


                    (2.1)Executor数据结构

                    通过线程池运行tasks

                      /**
                      * Spark executor, backed by a threadpool to run tasks.
                      *
                      * This can be used with Mesos, YARN, and the standalone scheduler.
                      * An internal RPC interface is used for communication with the driver,
                      * except in the case of Mesos fine-grained mode.
                      */
                      private[spark] class Executor(
                      executorId: String,
                      executorHostname: String,
                      env: SparkEnv,
                      userClassPath: Seq[URL] = Nil,
                      isLocal: Boolean = false,
                      uncaughtExceptionHandler: UncaughtExceptionHandler = new SparkUncaughtExceptionHandler,
                      resources: immutable.Map[String, ResourceInformation])
                      extends Logging {


                      // Start worker thread pool
                      // Use UninterruptibleThread to run tasks so that we can allow running codes without being
                      // interrupted by `Thread.interrupt()`. Some issues, such as KAFKA-1894, HADOOP-10622,
                      // will hang forever if some methods are interrupted.
                      private val threadPool = {
                      val threadFactory = new ThreadFactoryBuilder()
                      .setDaemon(true)
                      .setNameFormat("Executor task launch worker-%d")
                      .setThreadFactory((r: Runnable) => new UninterruptibleThread(r, "unused"))
                      .build()
                      Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
                      }

                      05


                      CoarseGrainedExecutorBackend类接收LaunchTask任务


                      1.CoarseGrainedExecutorBackend类中LaunchTask

                            case LaunchTask(data) =>
                        if (executor == null) {
                        exitExecutor(1, "Received LaunchTask command but executor was null")
                        } else {
                        val taskDesc = TaskDescription.decode(data.value)
                        logInfo("Got assigned task " + taskDesc.taskId)
                        taskResources(taskDesc.taskId) = taskDesc.resources
                        executor.launchTask(this, taskDesc)
                        }


                        (1.2)这里进行new TaskRunner

                            def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
                          val tr = new TaskRunner(context, taskDescription, plugins)
                          runningTasks.put(taskDescription.taskId, tr)
                          threadPool.execute(tr)
                          if (decommissioned) {
                          log.error(s"Launching a task while in decommissioned state.")
                          }
                          }

                          (1.3)TaskRunner实现了Runnable进行创建线程,完成后由线程池来执行线程

                              class TaskRunner(
                            execBackend: ExecutorBackend,
                            private val taskDescription: TaskDescription,
                            private val plugins: Option[PluginContainer])
                            extends Runnable {

                            override def run(): Unit = {

                            val value = Utils.tryWithSafeFinally {
                            val res = task.run(
                            taskAttemptId = taskId,
                            attemptNumber = taskDescription.attemptNumber,
                            metricsSystem = env.metricsSystem,
                            resources = taskDescription.resources,
                            plugins = plugins)
                            threwException = false
                            res


                               /**
                              * Called by [[org.apache.spark.executor.Executor]] to run this task.
                              *
                              * @param taskAttemptId an identifier for this task attempt that is unique within a SparkContext.
                              * @param attemptNumber how many times this task has been attempted (0 for the first attempt)
                              * @param resources other host resources (like gpus) that this task attempt can access
                              * @return the result of the task along with updates of Accumulators.
                              */
                              final def run(
                              taskAttemptId: Long,
                              attemptNumber: Int,
                              metricsSystem: MetricsSystem,
                              resources: Map[String, ResourceInformation],
                              plugins: Option[PluginContainer]): T = {
                              SparkEnv.get.blockManager.registerTask(taskAttemptId)
                              // TODO SPARK-24874 Allow create BarrierTaskContext based on partitions, instead of whether
                              // the stage is barrier.
                              val taskContext = new TaskContextImpl(
                              stageId,
                              stageAttemptId, // stageAttemptId and stageAttemptNumber are semantically equal
                              partitionId,
                              taskAttemptId,
                              attemptNumber,
                              taskMemoryManager,
                              localProperties,
                              metricsSystem,
                              metrics,
                              resources)


                              context = if (isBarrier) {
                              new BarrierTaskContext(taskContext)
                              } else {
                              taskContext
                              }


                              InputFileBlockHolder.initialize()
                              TaskContext.setTaskContext(context)
                              taskThread = Thread.currentThread()


                              if (_reasonIfKilled != null) {
                              kill(interruptThread = false, _reasonIfKilled)
                              }


                              new CallerContext(
                              "TASK",
                              SparkEnv.get.conf.get(APP_CALLER_CONTEXT),
                              appId,
                              appAttemptId,
                              jobId,
                              Option(stageId),
                              Option(stageAttemptId),
                              Option(taskAttemptId),
                              Option(attemptNumber)).setCurrentContext()


                              plugins.foreach(_.onTaskStart())


                              try {
                              runTask(context)
                              } catch {
                              case e: Throwable =>
                              // Catch all errors; run task failure callbacks, and rethrow the exception.
                              try {
                              context.markTaskFailed(e)
                              } catch {
                              case t: Throwable =>
                              e.addSuppressed(t)
                              }
                              context.markTaskCompleted(Some(e))
                              throw e
                              } finally {
                              try {
                              // Call the task completion callbacks. If "markTaskCompleted" is called twice, the second
                              // one is no-op.
                              context.markTaskCompleted(None)
                              } finally {
                              try {
                              Utils.tryLogNonFatalError {
                              // Release memory used by this thread for unrolling blocks
                              SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP)
                              SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(
                              MemoryMode.OFF_HEAP)
                              // Notify any tasks waiting for execution memory to be freed to wake up and try to
                              // acquire memory again. This makes impossible the scenario where a task sleeps forever
                              // because there are no other tasks left to notify it. Since this is safe to do but may
                              // not be strictly necessary, we should revisit whether we can remove this in the
                              // future.
                              val memoryManager = SparkEnv.get.memoryManager
                              memoryManager.synchronized { memoryManager.notifyAll() }
                              }
                              } finally {
                              // Though we unset the ThreadLocal here, the context member variable itself is still
                              // queried directly in the TaskRunner to check for FetchFailedExceptions.
                              TaskContext.unset()
                              InputFileBlockHolder.unset()
                              }
                              }
                              }
                              }

                              runTask的方法调用的是TaskContext对象

                                  def runTask(context: TaskContext): T


                                Task是一个抽象类,真正调用的是ShuffleMapTask或者ResulTask

                                  /**
                                  * A unit of execution. We have two kinds of Task's in Spark:
                                  *
                                  * - [[org.apache.spark.scheduler.ShuffleMapTask]]
                                  * - [[org.apache.spark.scheduler.ResultTask]]


                                  感谢大家阅读,希望点赞、收藏、关注。


                                  奇迹的出现往往就在再检查一下的时候!




                                  文章转载自beenrun,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                  评论