点击关注上方“知了小巷”,
设为“置顶或星标”,第一时间送达干货。
ApplicationMaster<-->ResourceManager
“通用”YARN应用涉及的角色及交互:
RM:ResourceManager
AM:ApplicationMaster
NM:NodeManager
交互中用到的主要通信协议:
ApplicationClientProtocol
ApplicationMasterProtocol
ContainerManagementProtocol
Client<-->ResourceManager
客户端程序与RM进行交互,通过YarnClient对象来实现。
ApplicationMaster<-->ResourceManager
AM与RM进行交互,通过AMRMClientAsync对象来实现,
AMRMClientAsync.CallbackHandler异步处理事件信息。
ApplicationMaster<-->NodeManager
AM与NM进行交互,通过NMClientAsync对象来实现,主要是启动Container,
NMClientAsync.CallbackHandler异步处理Container事件。
接口请求和响应的proto message定义:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto。
Hadoop版本3.2.1
Flink版本1.10
1.以Flink中Yarn per-job模式下
JobManager------
进程YarnJobClusterEntrypoint为例
// 起点是YarnJobClusterEntrypoint#main方法
// 落点是YarnResourceManager

/*** The yarn implementation of the resource manager. Used when the system is started* via the resource framework YARN.*/public class YarnResourceManager extends ActiveResourceManager<YarnWorkerNode>implements AMRMClientAsync.CallbackHandler, NMClientAsync.CallbackHandler {// 传说中的ApplicationMaster.../** resourceManagerClient与ResourceManager进行交互 Client to communicate with the Resource Manager (YARN's master). */private AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient;/** nodeManagerClient与NodeManager进行交互 Client to communicate with the Node manager and launch TaskExecutor processes. */private NMClientAsync nodeManagerClient;...}
AMRMClientAsync
abstract class(YARN应用需要自定义实现),用来处理与ResourceManager之间的通信和交互,它提供对事件的异步更新操作,比如Container的分配和资源使用结束。它包含一个线程,定期向ResourceManager发送心跳。
需要通过实现AMRMClientAsync.CallbackHandler回调接口来配合AMRMClientAsync。
2.简单实例MyCallbackHandler

AMRMClientAsync客户端生命周期

3.AMRMClientAsync部分源码


4.AMRMClientAsyncImpl部分源码

5.AMRMClient部分源码
package org.apache.hadoop.yarn.client.api;import ...// 抽象类AMRMClientpublic abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extendsAbstractService {...}
6.AMRMClientImpl部分源码

7.ApplicationMasterProtocol部分源码
ApplicationMasterProtocol接口比较简单,只有三个方法
package org.apache.hadoop.yarn.api;import ...// 接口ApplicationMasterProtocolpublic interface ApplicationMasterProtocol {// 向RM注册自己(AM)public RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest request)throws YarnException, IOException;// 告诉RM,让RM注销自己(AM),有可能AM已经成功执行结束,也有可能应用失败了public FinishApplicationMasterResponse finishApplicationMaster(FinishApplicationMasterRequest request)throws YarnException, IOException;// AM与RM之间的主要接口(方法),处理AllocateRequest并返回AllocateResponse// 就是传说中的请求Container,是成批申请和响应的(比如Flink JobManager一次申请3个TaskManager)// 最多执行一次,不会重复和过度分配public AllocateResponse allocate(AllocateRequest request)throws YarnException, IOException;}
8.ApplicationMasterProtocolPBClientImpl部分源码
package org.apache.hadoop.yarn.api.impl.pb.client;import ...// 客户端ApplicationMasterProtocol接口的实现public class ApplicationMasterProtocolPBClientImpl implements ApplicationMasterProtocol, Closeable {private ApplicationMasterProtocolPB proxy;public ApplicationMasterProtocolPBClientImpl(long clientVersion, InetSocketAddress addr,Configuration conf) throws IOException {RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class, ProtobufRpcEngine.class);// 底层会调用java.lang.reflect.Proxy#newProxyInstanceproxy =(ApplicationMasterProtocolPB) RPC.getProxy(ApplicationMasterProtocolPB.class, clientVersion,addr, conf);}...}
9.ApplicationMasterProtocolPB
package org.apache.hadoop.yarn.api;import ...@Private@Unstable@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB",protocolVersion = 1)public interface ApplicationMasterProtocolPB extends ApplicationMasterProtocolService.BlockingInterface {}
10.ApplicationMasterProtocolService的定义
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationmaster_protocol.proto
option java_package = "org.apache.hadoop.yarn.proto";option java_outer_classname = "ApplicationMasterProtocol";option java_generic_services = true;option java_generate_equals_and_hash = true;package hadoop.yarn;import "yarn_service_protos.proto";service ApplicationMasterProtocolService {rpc registerApplicationMaster (RegisterApplicationMasterRequestProto) returns (RegisterApplicationMasterResponseProto);rpc finishApplicationMaster (FinishApplicationMasterRequestProto) returns (FinishApplicationMasterResponseProto);rpc allocate (AllocateRequestProto) returns (AllocateResponseProto);}
11.ApplicationMasterProtocolPBServiceImpl部分源码
ApplicationMasterProtocolPB接口的服务端(RM)实现
package org.apache.hadoop.yarn.api.impl.pb.service;import ...@Privatepublic class ApplicationMasterProtocolPBServiceImpl implements ApplicationMasterProtocolPB {private ApplicationMasterProtocol real;// ResourceManager启动时会通过此构造方法初始化real对象public ApplicationMasterProtocolPBServiceImpl(ApplicationMasterProtocol impl) {this.real = impl;}...}
12.ApplicationMasterService部分源码
package org.apache.hadoop.yarn.server.resourcemanager;import ...@SuppressWarnings("unchecked")@Privatepublic class ApplicationMasterService extends AbstractService implementsApplicationMasterProtocol {// 最终会调用到这里的方法并返回结果...}
【END】





