上一篇文章已经带大家认识了什么是Hadoop RPC,本文将从nodemanager启动流程来讲解HadoopRPC到底是如何进行远程过程调用。首先nodemanager需要和resourcemanager进行通信,双方必须定义一种通信协议,Hadoop2.0之后采用了google的protocol buffer的数据格式进行实现,协议定义在ResourceTracker.proto文件中,其内容如下:
service ResourceTrackerService {rpc registerNodeManager(RegisterNodeManagerRequestProto) returns (RegisterNodeManagerResponseProto);rpc nodeHeartbeat(NodeHeartbeatRequestProto) returns (NodeHeartbeatResponseProto);}
由于protocol buffer只提供序列化框架,并未提供具体的RPC实现,因此需要用户自己实现,在ResourceTracker接口中我们可以看见它是基于ResourceTracker.proto文件定义了两个通信接口
public interface ResourceTracker {@Idempotentpublic RegisterNodeManagerResponse registerNodeManager(RegisterNodeManagerRequest request) throws YarnException,IOException;@AtMostOncepublic NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)throws YarnException, IOException;}
分布式系统设计很多时候会采用master/slave架构,hadoop也不例外,像namenode-datanode,resourcemanager-nodemanager。slave被master管理,那么slave在启动的时候需要向master注册,注册思想是hadoop的核心设计思想之一,nodemanager在启动的时候需要向resourcemanager注册,同时其RPCEngine也需要向resourcemanager注册,因为收到远程客户端请求以后,实际上最后是让Handler来进行消息的派发,将消息交给对应的engine进行处理。这种注册的思想,可以减轻master节点的负担,因为分布式系统中master本身就是slave的消息中心,容易出现性能瓶颈,当采用注册思想时,resourcemanager无需关系nodemanager什么时候启动,什么时候停止,只需发送心跳信息即可。
那么nondemanager究竟是如何与resourcemanager进行通信的呢?我们从nodemanager进入看看其初始化过程
首先是NodeManager的main方法:
public static void main(String[] args) throws IOException {Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());//打印启动日志信息StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);NodeManager nodeManager = new NodeManager();Configuration conf = new YarnConfiguration();new GenericOptionsParser(conf, args);//初始化配置并启动服务nodeManager.initAndStartNodeManager(conf, false);}
在serviceInit方法中,我们可以看到其定义了一个用于状态更新的类,这个类也可以看作一个服务,因为它里面也定义了serviceStart()方法:
protected NodeStatusUpdater createNodeStatusUpdater(Context context,Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,metrics);}
ResourceManager协议的客户端就是在NodeStatusUpdaterImpl.ServiceStart中启动的:
@Overrideprotected void serviceStart() throws Exception {//...try {// Registration has to be in start so that ContainerManager can get the// perNM tokens needed to authenticate ContainerTokens.//这一步就是创建rpc客户端this.resourceTracker = getRMClient();registerWithRM();super.serviceStart();startStatusUpdater();} catch (Exception e) {//...}}
我们继续往下看,getRMClient()里面的createProxy方法:
protected static <T> T createRMProxy(final Configuration configuration,final Class<T> protocol, RMProxy instance) throws IOException {//...//判断集群是否开启了ResourcemanagerHAif (HAUtil.isHAEnabled(conf)) {//这里其实对HA进行了一层封装,封装之后客户端不用关系具体连接哪个rs,如果开启ha,则进行封装RMFailoverProxyProvider<T> provider =instance.createRMFailoverProxyProvider(conf, protocol);return (T) RetryProxy.create(protocol, provider, retryPolicy);} else {//...}
这里的proxy需要说明,在进程间通信时往往需要代理来实现,proxy就是代理的意思,它实现在调用远程方法时像是在调用本地方法一样简单。
RMFailoverProxyProvider<T> provider负责对rs进行ha封装,那它是如何进行真正对rpc对的创建呢?
我们继续往下看,点进create()方法中:
public static <T> Object create(Class<T> iface,FailoverProxyProvider<T> proxyProvider, RetryPolicy retryPolicy) {//proxyProvider是ConfiguredRMFailoverProxyProvider,即上文说的rs的ha封装对象return Proxy.newProxyInstance(proxyProvider.getInterface().getClassLoader(),new Class<?>[] { iface },new RetryInvocationHandler<T>(proxyProvider, retryPolicy));}
不难看出,这里使用了java的动态代理来代替ConfiguredRMFailoverProxyProvider对象的执行,
而java动态代理中,每一个动态代理对象都需要继承java.lang.reflect.InvocationHandler,
并实现其invoke方法,这里的InvocationHandler就是RetryInvocationHandler,
ConfiguredRMFailoverProxyProvider真正的rpc其实就是在RetryInvocationHandler的
构造方法中实现的,我们继续往下看:
protected RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,RetryPolicy defaultPolicy,Map<String, RetryPolicy> methodNameToPolicyMap) {this.proxyProvider = proxyProvider;this.defaultPolicy = defaultPolicy;this.methodNameToPolicyMap = methodNameToPolicyMap;//proxyProvider就是ConfiguredRMFailoverProxyProvider对象this.currentProxy = proxyProvider.getProxy();
public synchronized ProxyInfo<T> getProxy() {String rmId = rmServiceIds[currentProxyIndex];T current = proxies.get(rmId);if (current == null) {current = getProxyInternal();proxies.put(rmId, current);}return new ProxyInfo<T>(current, rmId);}
接下来才是真正获取代理对象:
private T getProxyInternal() {try {//获取rm地址信息ip端口等final InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);//获得代理对象return RMProxy.getProxy(conf, protocol, rmAddress);} catch (IOException ioe) {LOG.error("Unable to create proxy to the ResourceManager " +rmServiceIds[currentProxyIndex], ioe);return null;}}
我们继续看RMProxy.getProxy()方法:
/*** Get a proxy to the RM at the specified address. To be used to create a* RetryProxy.*///对于ResourceTracker协议来说,这里的protocol就是ResourceTracker.class@Privatestatic <T> T getProxy(final Configuration conf,final Class<T> protocol, final InetSocketAddress rmAddress)throws IOException {return UserGroupInformation.getCurrentUser().doAs(new PrivilegedAction<T>() {@Overridepublic T run() {//创建客户端代理对象return (T) YarnRPC.create(conf).getProxy(protocol, rmAddress, conf);}});}
YarnRPC是一个抽象类(Abstract Class),是Yarn对Hadoop RPC 的封装,Hadoop RPC有基于多种序列化方式的RPC协议,但是由于Yarn是Hadoop 2.0之后才有的组件,是很新的component, 因此Yarn所有的RPC调用都是基于google protobuf序列化方式的RPC进行的实现,其类图关系如下:

继续往下看Yarn.create():
public static YarnRPC create(Configuration conf) {LOG.debug("Creating YarnRPC for " +conf.get(YarnConfiguration.IPC_RPC_IMPL));String clazzName = conf.get(YarnConfiguration.IPC_RPC_IMPL);if (clazzName == null) {clazzName = YarnConfiguration.DEFAULT_IPC_RPC_IMPL;}try {return (YarnRPC) Class.forName(clazzName).newInstance();} catch (Exception e) {throw new YarnRuntimeException(e);}}
YarnRPC这个抽象类的实际实现类的名称是通过Yarn配置文件读取,因此实际上调用了HadoopYarnProtoRPC.getProxy方法:
public Object getProxy(Class protocol, InetSocketAddress addr,Configuration conf) {LOG.debug("Creating a HadoopYarnProtoRpc proxy for protocol " + protocol);return RpcFactoryProvider.getClientFactory(conf).getClient(protocol, 1,addr, conf);}
这里RpcClientFactory的默认实现是RpcClientFactoryPBImpl,我们继续看RpcClientFactoryPBImpl的getClient方法:
//对于ResourceTracker协议来说,这里的protocol就是ResourceTracker.classpublic Object getClient(Class<?> protocol, long clientVersion,InetSocketAddress addr, Configuration conf) {Constructor<?> constructor = cache.get(protocol);if (constructor == null) {Class<?> pbClazz = null;try {//根据yarn的规定,需要根据协议名称ResourceTracker拿到具体的客户端实现ResourceTracker-》ResourceTrackerPBClientImplpbClazz = localConf.getClassByName(getPBImplClassName(protocol));}//...try {//获得ResourceTrackerPBClientImpl对象Object retObject = constructor.newInstance(clientVersion, addr, conf);return retObject;}//...}
到这里我们已经拿到了ResourceTrackerPBClientImpl对象,我们来看看它的构造方法到底干了什么事:
public ResourceTrackerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {//这里进行了rpc引擎的注册,因为hadoop中不仅仅只有基于google的protocolbuffer序列化方式,还有writableRPC.setProtocolEngine(conf, ResourceTrackerPB.class, ProtobufRpcEngine.class);//设置并获取protocol的代理类proxy = (ResourceTrackerPB)RPC.getProxy(ResourceTrackerPB.class, clientVersion, addr, conf);}
这里我们的ResourceTrackerPBClientImpl具体实现协议想要使用,则必须向ProtocolEngine进行注册,还是体现了hadoop的注册思想,这里拿到了ResourceTracker协议的客户端实现类,此时客户端nodemanager就可以通过协议定义的方法,向rs进行注册,我们来看一下究竟是如何从客户端调用变成了服务端调用的:
首先registerNodeManager肯定是由NodeManager发起,而NodeManager交由NodeStatusUpdaterImpl对象与服务端进行通信,
public RegisterNodeManagerResponse registerNodeManager(RegisterNodeManagerRequest request) throws YarnException,IOException {RegisterNodeManagerRequestProto requestProto = ((RegisterNodeManagerRequestPBImpl)request).getProto();try {//proxy由上文的ResourceTrackerPBClientImpl构造函数创建return new RegisterNodeManagerResponsePBImpl(proxy.registerNodeManager(null, requestProto));} catch (ServiceException e) {RPCUtil.unwrapAndThrowException(e);return null;}}
在看ResourceTrackerPBClientImpl构造方法中的RPC.getProxy,因为使用了ProtobufRpcEngine引擎,因此这里RPC.getProxy实际上是调用ProtobufRpcEngine
的getProxy
@Override@SuppressWarnings("unchecked")public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy,AtomicBoolean fallbackToSimpleAuth) throws IOException {final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth);return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(protocol.getClassLoader(), new Class[]{protocol}, invoker), false);}
这里还是通过动态代理实现的,最后我们来看ProtobufRpcEngine的invoke是怎么代理客户端执行registerNodeManager方法执行的
@Overridepublic Object invoke(Object proxy, Method method, Object[] args)throws ServiceException {long startTime = 0;//...略//将方法信息打包发RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);//...略//提取方法头部信息Message theRequest = (Message) args[1];final RpcResponseWrapper val;try {//将信息发送给远程服务器,remoteId包含了远程服务器的ip端口等信息val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId,fallbackToSimpleAuth);}//略}
到此,整个处理就完成了,可以看到最终还是动态代理的invoke做了真正的远程过程调用。




