暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

手把手教你写一个RPC框架

山人彤 2021-03-04
728

0 前言1 总体介绍2 common模块    2.1 ReflectionUtils.java3 codec模块    3.1 序列化接口Encoder.java    3.2 json序列化实现JSONEncoder.java    3.3 反序列化接口Decoder.java    3.4 反序列化实现JSONDecoder.java4 proto模块    4.1 Peer.java    4.2 Request.java    4.3 ServiceDescriptor类    4.4 Response.java类5 transport网络模块    5.1 TransportClient接口    5.2 HTTPTransportClient.java    5.3 RequsetHandler.java    5.4 TransportServer.java    5.5 HTTPTransportServer.java6 server模块    6.1 RpcServer.java    6.2 RpcServerConfig.java    6.3 ServiceInstance.java    6.4 ServiceInvoker.java    6.5 ServiceManager.java7 client模块    7.1 TransportSelector.java    7.2 RandomTransportSelector.java    7.3 RpcClientConfig.java    7.4 RemoteInvoker.java    7.5 RpcClient.java8 Example    8.1 CalcService.java    8.2 CalcServiceImpl.java    8.3 Client.java    8.4 Server.java    8.5 运行9 完整工程与代码


0 前言


本文介绍一个RPC的简单实现,RPC的原理可以参考这里:3分钟搞懂RPC原理

涉及的技术:

  • 基础知识

    JavaCore,Maven,反射

  • 动态代理(生成client存根实际调用对象)

    Java的动态代理

  • 序列化(Java对象与二进制数据互转)

    fastjson

    序列化:Java对象转为二进制数组

    反序列化:二进制数组转为Java对象

  • 网络通信(传输序列化后的数据)

    jetty,URLConnection

本文只会大致介绍每个模块,以及模块中每个类的成员与方法,完整的工程链接和代码会放在文章结尾的链接中。

1 总体介绍

rpc实现有五个模块:

协议模块:定义了其他模块需要的java bean

序列化模块:定义了序列化和反序列化方法

网络模块:实现底层的网络通信,使用了jetty

server模块:rpc server端

client模块:rpc client端

模块之间的依赖关系如下图所示:

2 common模块

common模块提供了公共的代码,供其他模块使用。

2.1 ReflectionUtils.java

rpc的实现中,大量使用了反射方式来获取类对象。ReflectionUtils类提供了获取类对象执行类方法的函数。

public class ReflectionUtils {
   /**
    * 根据clazz创建对象
    * @param clazz 待创建对象的类
    * @param <T> 对象类型
    * @return 创建好的对象
    */

   public static <T> newInstance(Class<T> clazz);

   /**
    * 获取某个class的公有方法
    * @param clazz
    * @return
    */

   public static Method[] getPublicMethods(Class clazz);

   /**
    * 调用指定对象的指定方法
    * @param obj 被调用方法的对象
    * @param method 被调用的方法
    * @param args 方法的参数
    * @return 返回结果
    */

   public static Object invoke(Object obj,Method method,Object... args);
}


3 codec模块

序列化模块。数据在网络中进行传输时,都是二进制。所以发送方,需要先将对象转为二进制,这个过程称为序列化。接收方在接收到二进制数据后,将二进制对象转为对象,这个过程称为反序列化


3.1 序列化接口Encoder.java

将一个对象 序列化为二进制数组

public interface Encoder {
   byte[] encode(Object obj);
}

3.2 json序列化实现JSONEncoder.java

将对象转为json形式,然后再转为二进制数组

/**
* 序列化,将json对象转为二进制数组
*/

public byte[] encode(Object obj);


3.3 反序列化接口Decoder.java

将二进制数组转为对象

/**
    * 反序列化,将二进制数组转为对象
    * @param bytes 二进制数组
    * @param clazz 待转的类型
    * @param <T> 泛型
    * @return 返回的对象
    */

   <T>decode(byte[] bytes,Class<T> clazz);

3.4 反序列化实现JSONDecoder.java

将二进制数组反序列化为json格式,然后将json转为对象

public <T> decode(byte[] bytes, Class<T> clazz);


4 proto模块

协议模块,定义了其他模块需要的bean类。

4.1 Peer.java

定义了端口和ip信息

public class Peer {
   private String host; //ip
   private int port;  //端口
}

4.2 Request.java

client端向server通信时的请求类

public class Request {
   //请求的函数的描述信息
   private ServiceDescriptor service;
   //请求的参数
   private Object[] parameters;
}

4.3 ServiceDescriptor类

请求的描述信息,Server根据这个信息,来找到对应的函数执行。由于这个类之后需要查找与比较,所以需要重写equals方法以及hashCode方法。这个类还提供了一个静态方法from,来根据类名和方法名,来生成一个Service描述对象ServiceDescriptor。

public class ServiceDescriptor {
   private String clazz;  //类名
   private String method; //方法名
   private String returnType; //返回类型
   private String[] parameterTypes; //参数类型
   
   @Override
   public boolean equals(Object o) {
       if(this == o)return true;
       if(o == null || getClass() != o.getClass()) return false;

       ServiceDescriptor that = (ServiceDescriptor)o;
       return this.toString().equals(o.toString());
  }

   @Override
   public int hashCode() {
       return toString().hashCode();
  }

   @Override
   public String toString() {
       return     "clazz="+ clazz
               + ",method="+ method
               +",returnType="+ returnType
               +",parameterTypes="+Arrays.toString(parameterTypes);
  }
       /**
    * 通过clazz 和 method 抽取出方法的输入输出参数,生成ServiceDescript
    * @param clazz
    * @param method
    * @return
    */

   public static ServiceDescriptor from(Class clazz, Method method);
}


4.4 Response.java类

rpc调用之后的返回数据类

public class Response {
    /**
     * 服务返回编码:0-成功 非0失败
     */

    private int code =0;
    private String message = "ok"//错误信息
    private Object data; //返回数据对象
}


5 transport网络模块

这个模块是rpc最底层的网络通信模块,这里使用http来实现的。

5.1 TransportClient接口

client端的网络模块。在这里用的http短连接来实现的网络通信,在一般的rpc中,其实是用的tcp长连接来实现的  。

/**
 * 1. 创建连接
 * 2. 发送数据,并且等待响应
 * 3. 关闭连接
 */

public interface TransportClient {
    //连接,在实现中就是为url变量赋值
    void connect(Peer peer);

    //写数据
    InputStream write(InputStream data);

    //关闭连接
    void close();
}


5.2 HTTPTransportClient.java

TransportClient实现类,功能见接口类


5.3 RequsetHandler.java

server端处理client发过来的请求的类。由server模块调用网络接口时,作为参数传递给TransportServer来处理client的请求。

/**
 * 处理网络请求的handler
 */

public interface RequestHandler {
    void onRequest(InputStream recive, OutputStream toResp);
}


5.4 TransportServer.java

Sever端的网络模块接口

/**
 * 1. 启动、监听
 * 2. 接受请求
 * 3. 关闭监听
 * 在servlet中调用handler来处理客户端请求
 */

public interface TransportServer {
    /**
     * 初始化,设置servlet
     * @param port 端口
     * @param handler 处理请求的方法
     */

    void init(int port,RequestHandler handler);
    //开始服务
    void start();
    //停止服务
    void stop();
}


5.5 HTTPTransportServer.java

TransportServer的实现类


6 server模块

server部分执行流程如下所示:

6.1 RpcServer.java

server端类

public class RpcServer {
    
    private RpcServerConfig config = new RpcServerConfig();  //server配置类
    private TransportServer net;                             //server网络模块
    private Encoder encoder;                                 //序列化类
    private Decoder decoder;                                 //反序列化类
    private ServiceManager serviceManager;                   //service管理类
    private ServiceInvoker serviceInvoker;                   //servive执行类

    /**
         * 注册函数,将一个类注册为rpc service,其中的所有public方法会被注册为rpc service
         * 这个函数会调用ServiceManager类的register方法
         * @param interfaceClass 注册类的接口
         * @param bean           注册类的实现类
         * @param <T>            泛型
         */

    public <T> void register(Class<T>interfaceClass,T bean);

    /**
         * 开启网络模块
         */

    public void start();

    /**
         * 停止网络模块
         */

    public void stop();

    /**
         * 处理网络请求的实现类
         * 在初始化网络模块的时候,作为参数传入
         * 先将input的二进制数据读取出来,然后反序列化成Request类
         * 再通过ServiceManager类的lookup函数找到该service
         * 然后通过ServiceInvoker类的invoke方法来执行服务
         * 然后将结果序列化,返回
         */

    private RequestHandler handler = new RequestHandler() {
        ...
    }
}


6.2 RpcServerConfig.java

Server配置类

public class RpcServerConfig {
    private Class<? extends TransportServer > transportClass = HTTPTransportServer.class;

    private Class<? extends Encoder> encoderClass = JSONEncoder.class;
    private Class<? extends Decoder> decoderClass = JSONDecoder.class;
    private int port =3000;
}


6.3 ServiceInstance.java

Service实例类

public class ServiceInstance {
    private Object target;
    private Method method;
}

6.4 ServiceInvoker.java

调用具体服务

public class ServiceInvoker {
    public Object invoke(ServiceInstance service, Request request);   
}


6.5 ServiceManager.java

Service管理类,负责注册服务,查找服务。

public class ServiceManager {
    private Map<ServiceDescriptor,ServiceInstance>  services;

    //注册服务
    public <T> void register(Class<T> interfaceClass,T bean);

    //查找服务
    public ServiceInstance lookup(Request request);
}


7 client模块

client的执行步骤如下图所示:

7.1 TransportSelector.java

网络选择类,client对一个server可以有多个连接,以便负载均衡,所以在进行一次rpc时,需要选择一个连接。

/**
 * 表示选择哪个server去连接
 */

public interface TransportSelector {
    /**
     * 初始化selector
     * @param peers 可以连接的server端点信息
     * @param count client 与server建立多少个连接
     * @param clazz client实现class
     */

    void init(List<Peer> peers,int count,Class<? extends TransportClient> clazz);
    /**
     * 选择一个transport与server做交互
     * @return 网络Client
     */

    TransportClient select();
    /**
     * 释放用完的client
     * @param client
     */

    void release(TransportClient client);
    void close();
}


7.2 RandomTransportSelector.java

TransportSelector的实现类,实现随机选择一个连接。


7.3 RpcClientConfig.java

配置类,可以对Client进行一些配置

public class RpcClientConfig {
    private Class<? extends TransportClient> transportClass = HTTPTransportClient.class;
    private Class <? extends Encoder> encoderClass = JSONEncoder.class;
    private Class <? extends Decoder> decoderClass = JSONDecoder.class;
    private Class<? extends TransportSelector> selectorClass = RandomTransportSelector.class;

    private int connectCount =1;
    private List<Peer> servers = Arrays.asList(new Peer("127.0.0.1",3000));
}


7.4 RemoteInvoker.java

动态代理的代理类。在Client端,为了实现rpc,通过动态代理,增强原来的类。

/**
 * 调用远程服务的代理类
 */

@Slf4j
public class RemoteInvoker implements InvocationHandler {
    private Encoder encoder; //序列化
    private Class clazz;     //需要的服务类
    private Decoder decoder; //反序列化
    private TransportSelector selector;  //网络服务选择类
    
     /**
     * 代理类
     * proxy:
     * method:想调用方法
     * args: 方法的参数
     * 返回:方法的返回参数
     */

    public Object invoke(Object proxy,Method method,Object[] args) throws Throwable;
     
    /**
     * 远程执行,被invoke调用
     * @param request client请求
     * @return 返回 Response类
     */

    private Response invokeRemote(Request request);
    
}


7.5 RpcClient.java

Client类,将原有类动态代理为具有rpc功能的类。

public class RpcClient {
    private RpcClientConfig config = new RpcClientConfig();
    private Encoder encoder;
    private Decoder decoder;
    private TransportSelector selector;
     /**
     * 获取 RemoteInvoker 动态代理类
     * @param clazz
     * @param <T>
     * @return
     */

    public <T> getProxy(Class<T> clazz);
}


8 Example

一个实现上述实现的rpc的例子。例子是一个计算器的rpc调用,包括两个函数:加,减。

8.1 CalcService.java

计算类接口,Client端只需要接口类即可rpc。

public interface CalcService {
    int add(int a,int b);

    int minus(int a,int b);
}


8.2 CalcServiceImpl.java

计算实现类

package com.qzq.gkrpc.example;

public class CalcServiceImpl implements CalcService{

    @Override
    public int add(int a, int b) {
        return a+b;
    }

    @Override
    public int minus(int a, int b) {
        return a-b;
    }
}


8.3 Client.java

rpc的Client端

package com.qzq.gkrpc.example;
import com.qzq.gkrpc.client.RpcClient;

public class Client {
    public static void main(String[] args) {
        RpcClient client = new RpcClient();
        CalcService  service = client.getProxy(CalcService.class);

        int r1 = service.add(1,2);
        int r2 = service.minus(10,8);

        System.out.println(r1);
        System.out.println(r2);
    }
}


8.4 Server.java

rpc的Server端

package com.qzq.gkrpc.example;

import com.qzq.gkrpc.server.RpcServer;

public class Server {
    public static void main(String[] args) {
        RpcServer server = new RpcServer();
        server.register(CalcService.class,new CalcServiceImpl());
        server.start();
    }
}


8.5 运行

先启动server端

然后运行client

成功!


9 完整工程与代码

关注公众号【山人彤】,后台回复【RPC】即可免费领取


文章转载自山人彤,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论