暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Hadoop RPC客户端实现原理浅析

DBA成长之道 2021-03-10
527

上一篇文章已经带大家认识了什么是Hadoop RPC,本文将从nodemanager启动流程来讲解HadoopRPC到底是如何进行远程过程调用。首先nodemanager需要和resourcemanager进行通信,双方必须定义一种通信协议,Hadoop2.0之后采用了google的protocol buffer的数据格式进行实现,协议定义在ResourceTracker.proto文件中,其内容如下:

    service ResourceTrackerService {
    rpc registerNodeManager(RegisterNodeManagerRequestProto) returns (RegisterNodeManagerResponseProto);
    rpc nodeHeartbeat(NodeHeartbeatRequestProto) returns (NodeHeartbeatResponseProto);
    }

    由于protocol buffer只提供序列化框架,并未提供具体的RPC实现,因此需要用户自己实现,在ResourceTracker接口中我们可以看见它是基于ResourceTracker.proto文件定义了两个通信接口

      public interface ResourceTracker {
      @Idempotent
      public RegisterNodeManagerResponse registerNodeManager(
      RegisterNodeManagerRequest request) throws YarnException,
            IOException;
      @AtMostOnce
      public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
            throws YarnException, IOException;
      }

      分布式系统设计很多时候会采用master/slave架构,hadoop也不例外,像namenode-datanode,resourcemanager-nodemanager。slave被master管理,那么slave在启动的时候需要向master注册,注册思想是hadoop的核心设计思想之一,nodemanager在启动的时候需要向resourcemanager注册,同时其RPCEngine也需要向resourcemanager注册,因为收到远程客户端请求以后,实际上最后是让Handler来进行消息的派发,将消息交给对应的engine进行处理。这种注册的思想,可以减轻master节点的负担,因为分布式系统中master本身就是slave的消息中心,容易出现性能瓶颈,当采用注册思想时,resourcemanager无需关系nodemanager什么时候启动,什么时候停止,只需发送心跳信息即可。

      那么nondemanager究竟是如何与resourcemanager进行通信的呢?我们从nodemanager进入看看其初始化过程

      首先是NodeManager的main方法:

          public static void main(String[] args) throws IOException {
        Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
        //打印启动日志信息
        StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
            NodeManager nodeManager = new NodeManager();
        Configuration conf = new YarnConfiguration();
        new GenericOptionsParser(conf, args);
            //初始化配置并启动服务
        nodeManager.initAndStartNodeManager(conf, false);
        }

        在serviceInit方法中,我们可以看到其定义了一个用于状态更新的类,这个类也可以看作一个服务,因为它里面也定义了serviceStart()方法:

            protected NodeStatusUpdater createNodeStatusUpdater(Context context,
          Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
          return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
          metrics);
          }

          ResourceManager协议的客户端就是在NodeStatusUpdaterImpl.ServiceStart中启动的:

              @Override
            protected void serviceStart() throws Exception {
                //...
            try {
            // Registration has to be in start so that ContainerManager can get the
            // perNM tokens needed to authenticate ContainerTokens.
                  //这一步就是创建rpc客户端
            this.resourceTracker = getRMClient();
            registerWithRM();
            super.serviceStart();
            startStatusUpdater();
            } catch (Exception e) {
            //...
            }
            }

            我们继续往下看,getRMClient()里面的createProxy方法:

                protected static <T> createRMProxy(final Configuration configuration,
              final Class<T> protocol, RMProxy instance) throws IOException {
              //...
                  //判断集群是否开启了ResourcemanagerHA
              if (HAUtil.isHAEnabled(conf)) {
                  //这里其实对HA进行了一层封装,封装之后客户端不用关系具体连接哪个rs,如果开启ha,则进行封装
              RMFailoverProxyProvider<T> provider =
              instance.createRMFailoverProxyProvider(conf, protocol);
              return (T) RetryProxy.create(protocol, provider, retryPolicy);
              } else {
                   //...
              }

              这里的proxy需要说明,在进程间通信时往往需要代理来实现,proxy就是代理的意思,它实现在调用远程方法时像是在调用本地方法一样简单。

              RMFailoverProxyProvider<T> provider负责对rs进行ha封装,那它是如何进行真正对rpc对的创建呢?

              我们继续往下看,点进create()方法中:

                  public static <T> Object create(Class<T> iface,
                FailoverProxyProvider<T> proxyProvider, RetryPolicy retryPolicy) {
                    //proxyProvider是ConfiguredRMFailoverProxyProvider,即上文说的rs的ha封装对象
                return Proxy.newProxyInstance(
                proxyProvider.getInterface().getClassLoader(),
                new Class<?>[] { iface },
                new RetryInvocationHandler<T>(proxyProvider, retryPolicy)
                );
                }

                不难看出,这里使用了java的动态代理来代替ConfiguredRMFailoverProxyProvider对象的执行,

                而java动态代理中,每一个动态代理对象都需要继承java.lang.reflect.InvocationHandler,

                并实现其invoke方法,这里的InvocationHandler就是RetryInvocationHandler,

                ConfiguredRMFailoverProxyProvider真正的rpc其实就是在RetryInvocationHandler的

                构造方法中实现的,我们继续往下看:

                    protected RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,
                  RetryPolicy defaultPolicy,
                  Map<String, RetryPolicy> methodNameToPolicyMap) {
                  this.proxyProvider = proxyProvider;
                  this.defaultPolicy = defaultPolicy;
                  this.methodNameToPolicyMap = methodNameToPolicyMap;
                      //proxyProvider就是ConfiguredRMFailoverProxyProvider对象
                  this.currentProxy = proxyProvider.getProxy();


                      public synchronized ProxyInfo<T> getProxy() {
                    String rmId = rmServiceIds[currentProxyIndex];
                    T current = proxies.get(rmId);
                    if (current == null) {
                    current = getProxyInternal();
                    proxies.put(rmId, current);
                    }
                    return new ProxyInfo<T>(current, rmId);
                    }


                    接下来才是真正获取代理对象:

                        private T getProxyInternal() {
                      try {
                          //获取rm地址信息ip端口等
                      final InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
                      //获得代理对象
                      return RMProxy.getProxy(conf, protocol, rmAddress);
                      } catch (IOException ioe) {
                      LOG.error("Unable to create proxy to the ResourceManager " +
                      rmServiceIds[currentProxyIndex], ioe);
                      return null;
                      }
                      }

                      我们继续看RMProxy.getProxy()方法:

                          /**
                        * Get a proxy to the RM at the specified address. To be used to create a
                        * RetryProxy.
                        */
                           //对于ResourceTracker协议来说,这里的protocol就是ResourceTracker.class
                        @Private
                        static <T> T getProxy(final Configuration conf,
                        final Class<T> protocol, final InetSocketAddress rmAddress)
                        throws IOException {
                        return UserGroupInformation.getCurrentUser().doAs(
                        new PrivilegedAction<T>() {
                        @Override
                        public T run() {
                                //创建客户端代理对象
                        return (T) YarnRPC.create(conf).getProxy(protocol, rmAddress, conf);
                        }
                        });
                        }

                        YarnRPC是一个抽象类(Abstract Class),是Yarn对Hadoop RPC 的封装,Hadoop RPC有基于多种序列化方式的RPC协议,但是由于Yarn是Hadoop 2.0之后才有的组件,是很新的component, 因此Yarn所有的RPC调用都是基于google protobuf序列化方式的RPC进行的实现,其类图关系如下:


                        继续往下看Yarn.create():

                            public static YarnRPC create(Configuration conf) {
                          LOG.debug("Creating YarnRPC for " +
                          conf.get(YarnConfiguration.IPC_RPC_IMPL));
                          String clazzName = conf.get(YarnConfiguration.IPC_RPC_IMPL);
                          if (clazzName == null) {
                          clazzName = YarnConfiguration.DEFAULT_IPC_RPC_IMPL;
                          }
                          try {
                          return (YarnRPC) Class.forName(clazzName).newInstance();
                          } catch (Exception e) {
                          throw new YarnRuntimeException(e);
                          }
                          }

                          YarnRPC这个抽象类的实际实现类的名称是通过Yarn配置文件读取,因此实际上调用了HadoopYarnProtoRPC.getProxy方法:

                              public Object getProxy(Class protocol, InetSocketAddress addr,
                            Configuration conf) {
                            LOG.debug("Creating a HadoopYarnProtoRpc proxy for protocol " + protocol);
                            return RpcFactoryProvider.getClientFactory(conf).getClient(protocol, 1,
                            addr, conf);
                            }

                            这里RpcClientFactory的默认实现是RpcClientFactoryPBImpl,我们继续看RpcClientFactoryPBImpl的getClient方法:

                              //对于ResourceTracker协议来说,这里的protocol就是ResourceTracker.class
                              public Object getClient(Class<?> protocol, long clientVersion,
                              InetSocketAddress addr, Configuration conf) {

                              Constructor<?> constructor = cache.get(protocol);
                              if (constructor == null) {
                              Class<?> pbClazz = null;
                              try {
                                    //根据yarn的规定,需要根据协议名称ResourceTracker拿到具体的客户端实现ResourceTracker-》ResourceTrackerPBClientImpl
                              pbClazz = localConf.getClassByName(getPBImplClassName(protocol));
                                    } 
                                    //...
                              try {
                                  //获得ResourceTrackerPBClientImpl对象
                              Object retObject = constructor.newInstance(clientVersion, addr, conf);
                              return retObject;
                                  } 
                                  //...
                              }

                              到这里我们已经拿到了ResourceTrackerPBClientImpl对象,我们来看看它的构造方法到底干了什么事:

                                  public ResourceTrackerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {
                                    //这里进行了rpc引擎的注册,因为hadoop中不仅仅只有基于google的protocolbuffer序列化方式,还有writable
                                RPC.setProtocolEngine(conf, ResourceTrackerPB.class, ProtobufRpcEngine.class);
                                //设置并获取protocol的代理类
                                proxy = (ResourceTrackerPB)RPC.getProxy(
                                ResourceTrackerPB.class, clientVersion, addr, conf);
                                }

                                这里我们的ResourceTrackerPBClientImpl具体实现协议想要使用,则必须向ProtocolEngine进行注册,还是体现了hadoop的注册思想,这里拿到了ResourceTracker协议的客户端实现类,此时客户端nodemanager就可以通过协议定义的方法,向rs进行注册,我们来看一下究竟是如何从客户端调用变成了服务端调用的:

                                首先registerNodeManager肯定是由NodeManager发起,而NodeManager交由NodeStatusUpdaterImpl对象与服务端进行通信,

                                    public RegisterNodeManagerResponse registerNodeManager(
                                  RegisterNodeManagerRequest request) throws YarnException,
                                        IOException {
                                  RegisterNodeManagerRequestProto requestProto = ((RegisterNodeManagerRequestPBImpl)request).getProto();
                                  try {
                                      //proxy由上文的ResourceTrackerPBClientImpl构造函数创建
                                  return new RegisterNodeManagerResponsePBImpl(proxy.registerNodeManager(null, requestProto));
                                  } catch (ServiceException e) {
                                  RPCUtil.unwrapAndThrowException(e);
                                  return null;
                                  }
                                  }

                                  在看ResourceTrackerPBClientImpl构造方法中的RPC.getProxy,因为使用了ProtobufRpcEngine引擎,因此这里RPC.getProxy实际上是调用ProtobufRpcEngine

                                  的getProxy

                                      @Override
                                    @SuppressWarnings("unchecked")
                                    public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
                                    InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
                                    SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy,
                                    AtomicBoolean fallbackToSimpleAuth) throws IOException {


                                    final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
                                    rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth);
                                    return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
                                    protocol.getClassLoader(), new Class[]{protocol}, invoker), false);
                                    }

                                    这里还是通过动态代理实现的,最后我们来看ProtobufRpcEngine的invoke是怎么代理客户端执行registerNodeManager方法执行的

                                        @Override
                                      public Object invoke(Object proxy, Method method, Object[] args)
                                      throws ServiceException {
                                            long startTime = 0;
                                            //...略
                                           //将方法信息打包发
                                      RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
                                            //...略
                                            //提取方法头部信息
                                      Message theRequest = (Message) args[1];
                                      final RpcResponseWrapper val;
                                      try {
                                            //将信息发送给远程服务器,remoteId包含了远程服务器的ip端口等信息
                                      val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
                                      new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId,
                                      fallbackToSimpleAuth);


                                      }
                                      //略
                                      }


                                      到此,整个处理就完成了,可以看到最终还是动态代理的invoke做了真正的远程过程调用。

                                      文章转载自DBA成长之道,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                      评论