暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Hadoop YARN:ApplicationMaster与ResourceManager交互源码解析

大数据真有意思 2020-06-24
468

点击关注上方“知了小巷”,

设为“置顶或星标”,第一时间送达干货。

ApplicationMaster<-->ResourceManager


“通用”YARN应用涉及的角色及交互:

RM:ResourceManager

AM:ApplicationMaster

NM:NodeManager


交互中用到的主要通信协议:

ApplicationClientProtocol

ApplicationMasterProtocol

ContainerManagementProtocol


Client<-->ResourceManager

客户端程序与RM进行交互,通过YarnClient对象来实现。

ApplicationMaster<-->ResourceManager

AM与RM进行交互,通过AMRMClientAsync对象来实现,

AMRMClientAsync.CallbackHandler异步处理事件信息。

ApplicationMaster<-->NodeManager

AM与NM进行交互,通过NMClientAsync对象来实现,主要是启动Container,

NMClientAsync.CallbackHandler异步处理Container事件。

接口请求和响应的proto message定义:

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto。

Hadoop版本3.2.1

Flink版本1.10


1.以Flink中Yarn per-job模式下

JobManager------

进程YarnJobClusterEntrypoint为例

// 起点是YarnJobClusterEntrypoint#main方法

// 落点是YarnResourceManager

    /**
    * The yarn implementation of the resource manager. Used when the system is started
    * via the resource framework YARN.
    */
    public class YarnResourceManager extends ActiveResourceManager<YarnWorkerNode>
    implements AMRMClientAsync.CallbackHandler, NMClientAsync.CallbackHandler {
    // 传说中的ApplicationMaster
    ...
    /** resourceManagerClient与ResourceManager进行交互 Client to communicate with the Resource Manager (YARN's master). */
        private AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient;


    /** nodeManagerClient与NodeManager进行交互 Client to communicate with the Node manager and launch TaskExecutor processes. */
    private NMClientAsync nodeManagerClient;
    ...
    }

    AMRMClientAsync

    abstract class(YARN应用需要自定义实现),用来处理与ResourceManager之间的通信和交互,它提供对事件的异步更新操作,比如Container的分配和资源使用结束。它包含一个线程,定期向ResourceManager发送心跳。


    需要通过实现AMRMClientAsync.CallbackHandler回调接口来配合AMRMClientAsync。


    2.简单实例MyCallbackHandler

    AMRMClientAsync客户端生命周期


    3.AMRMClientAsync部分源码


    4.AMRMClientAsyncImpl部分源码


    5.AMRMClient部分源码

      package org.apache.hadoop.yarn.client.api;


      import ...


      // 抽象类AMRMClient
      public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
      AbstractService {
      ...
      }


      6.AMRMClientImpl部分源码


      7.ApplicationMasterProtocol部分源码

      ApplicationMasterProtocol接口比较简单,只有三个方法

        package org.apache.hadoop.yarn.api;


        import ...
        // 接口ApplicationMasterProtocol
        public interface ApplicationMasterProtocol {
        // 向RM注册自己(AM)
        public RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest request)
        throws YarnException, IOException;

        // 告诉RM,让RM注销自己(AM),有可能AM已经成功执行结束,也有可能应用失败了
        public FinishApplicationMasterResponse finishApplicationMaster(FinishApplicationMasterRequest request)
        throws YarnException, IOException;
        // AM与RM之间的主要接口(方法),处理AllocateRequest并返回AllocateResponse
        // 就是传说中的请求Container,是成批申请和响应的(比如Flink JobManager一次申请3个TaskManager)
        // 最多执行一次,不会重复和过度分配
        public AllocateResponse allocate(AllocateRequest request)
        throws YarnException, IOException;
        }


        8.ApplicationMasterProtocolPBClientImpl部分源码

          package org.apache.hadoop.yarn.api.impl.pb.client;


          import ...


          // 客户端ApplicationMasterProtocol接口的实现
          public class ApplicationMasterProtocolPBClientImpl implements ApplicationMasterProtocol, Closeable {


            private ApplicationMasterProtocolPB proxy;


          public ApplicationMasterProtocolPBClientImpl(long clientVersion, InetSocketAddress addr,
          Configuration conf) throws IOException {
          RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class, ProtobufRpcEngine.class);
          // 底层会调用java.lang.reflect.Proxy#newProxyInstance
          proxy =
          (ApplicationMasterProtocolPB) RPC.getProxy(ApplicationMasterProtocolPB.class, clientVersion,
          addr, conf);
          }
          ...
          }


          9.ApplicationMasterProtocolPB

            package org.apache.hadoop.yarn.api;


            import ...


            @Private
            @Unstable
            @ProtocolInfo(protocolName = "org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB",
            protocolVersion = 1)
            public interface ApplicationMasterProtocolPB extends ApplicationMasterProtocolService.BlockingInterface {


            }


            10.ApplicationMasterProtocolService的定义

            hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationmaster_protocol.proto

              option java_package = "org.apache.hadoop.yarn.proto";
              option java_outer_classname = "ApplicationMasterProtocol";
              option java_generic_services = true;
              option java_generate_equals_and_hash = true;
              package hadoop.yarn;


              import "yarn_service_protos.proto";


              service ApplicationMasterProtocolService {
              rpc registerApplicationMaster (RegisterApplicationMasterRequestProto) returns (RegisterApplicationMasterResponseProto);
              rpc finishApplicationMaster (FinishApplicationMasterRequestProto) returns (FinishApplicationMasterResponseProto);
              rpc allocate (AllocateRequestProto) returns (AllocateResponseProto);
              }


              11.ApplicationMasterProtocolPBServiceImpl部分源码

              ApplicationMasterProtocolPB接口的服务端(RM)实现

                package org.apache.hadoop.yarn.api.impl.pb.service;


                import ...


                @Private
                public class ApplicationMasterProtocolPBServiceImpl implements ApplicationMasterProtocolPB {


                private ApplicationMasterProtocol real;
                // ResourceManager启动时会通过此构造方法初始化real对象
                public ApplicationMasterProtocolPBServiceImpl(ApplicationMasterProtocol impl) {
                this.real = impl;
                }
                ...
                }


                12.ApplicationMasterService部分源码

                  package org.apache.hadoop.yarn.server.resourcemanager;


                  import ...


                  @SuppressWarnings("unchecked")
                  @Private
                  public class ApplicationMasterService extends AbstractService implements
                  ApplicationMasterProtocol {
                  // 最终会调用到这里的方法并返回结果
                  ...
                  }

                  【END】

                   

                  文章转载自大数据真有意思,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                  评论