这里面是远程通讯的核心,包含了网络通信、编解码协议、远程调用、注册中心、负载均衡等核心代码都在这里面,下面就详细分析下;
Tips:这里的文章有点滞后性,实际代码有一点修改,所以在看这里的内容的时候最好是跟着代码一起看,但是核心思路和绝大部分代码都是一样的,有一些修改:
•将实际调用改成了异步;•新增了优雅上下线;•新增了filter和快速失败等;
不过这些看下代码就懂了,很简单;所以这个文章影响不大,只是为了让新手能更好理解;
项目目录
.└── core├── compress│ ├── DefaultCompressor.java│ └── GzipCompressor.java├── config│ ├── ConfigManager.java│ └── loader│ ├── PropertiesConfigLoader.java│ └── SystemPropertyLoader.java├── loadbalance│ ├── AbstractLoadBalance.java│ ├── LoadBalanceFactory.java│ └── rule│ ├── random│ │ ├── RandomRule.java│ │ └── RandomWeightRule.java│ └── round│ ├── NormalWeightRoundRule.java│ ├── RoundRule.java│ └── SmoothWeightRoundRule.java├── network│ ├── cache│ │ ├── ConnectCache.java│ │ ├── RegisterInfoCache.java│ │ └── SimpleRpcServiceCache.java│ ├── client│ │ ├── ClientSocketHandler.java│ │ └── RpcClientSocket.java│ ├── codec│ │ ├── RpcDecoder.java│ │ ├── RpcEncoder.java│ │ ├── RpcMessageDecoder.java│ │ └── RpcMessageEncoder.java│ ├── message│ │ ├── Request.java│ │ ├── Response.java│ │ └── RpcMessage.java│ ├── send│ │ ├── SyncWrite.java│ │ ├── SyncWriteFuture.java│ │ ├── SyncWriteMap.java│ │ └── WriteFuture.java│ └── server│ ├── RpcServerSocket.java│ ├── ServerSocketHandler.java│ └── hook│ └── ServerExitHook.java├── reflect│ ├── RpcInvocationHandler.java│ └── RpcProxy.java├── register│ ├── AbstractRegisterCenter.java│ ├── RegisterCenterFactory.java│ └── strategy│ ├── LocalRegisterCenter.java│ └── RedisRegisterCenter.java└── serializer└── ProtostuffSerializer.java
简单介绍下项目:
•compress
:这个下面是压缩类的实现,当然其对应的接口是放在了common项目里面,然后也提供了SPI机制,可以使用自己提供的实现;•config
:这个包下面就是配置加载器相关的实现
1.ConfigLoader
:这个是对应的文件加载方式,其实就是从key=value的格式总通过key获取对应的value值;2.ConfigManager
:这里面就是包含了文件扫描,设置值对象等;
•loadbalance
:这里面就是对应的负载均衡算法;•network
:这里面就是对应netty相关的东西,编解码、nettyserver、nettyclient等;•reflect
:反射相关的东西,其实应该叫动态代理相关的东西;远程调用的核心也在这里;•register
:这个是注册中心在的地方,本来是打算单独拿出去做个小模块的,后面觉得注册中心单独使用的概率不大,不过后续可以考虑,但是纬度没有想好,是以中间件作维度,还是以制定数据格式作维度需要思考一下;•serializer
:序列化相关的东西;
SPI机制
这里讲讲RPC的SPI
机制,java里面自带的是Service Provider Interface
;其实就是提供接口类,但是实现由三方提供;不太知道的同学可以随便找个文章看看;
接口解析
simple-rpc里面的话,也是参考这种规则,其实实现也比较简单,一个用于方便获取数据的holder :ExtensionHolder、
和一个具体实现的加载器:ExtensionLoader
;跟着代码看:
/** 扩展类实例缓存 {name: 扩展类实例} */private final Map<String, T> extensionsCache = new ConcurrentHashMap<>(8);/** 扩展加载器实例缓存 {类型:加载器实例} */private static final Map<Class<?>, ExtensionLoader<?>> extensionLoaderCache = new ConcurrentHashMap<>(8);/** 扩展类配置列表缓存 {type: {name, 扩展类}} */private final ExtensionHolder<Map<String, Class<?>>> extensionClassesCache = new ExtensionHolder<>();/** 创建扩展实例类的锁缓存 {name: synchronized 持有的锁} */private final Map<String, Object> createExtensionLockMap = new ConcurrentHashMap<>(8);/** 扩展类加载器的类型 */private final Class<T> type;/** 扩展类存放的目录地址 */private static final String EXTENSION_PATH = "META-INF/simple-rpc/";/** 默认扩展名缓存 */private final String defaultNameCache;private ExtensionLoader(Class<T> type) {this.type = type;SimpleRpcSPI annotation = type.getAnnotation(SimpleRpcSPI.class);defaultNameCache = annotation.value();}
这里就是各种缓存,具体的就不说了,注释里面很详细,有点就是默认值是由注解里面的value值去获取的,这里都是有默认初始值的;
能使用SPI机制的接口都放在了simple-rpc-common
模块里面:
@SimpleRpcSPI(value = "default")public interface Compressor {...}@SimpleRpcSPI(value = "simple-rpc-property")public interface ConfigLoader {...}@SimpleRpcSPI(value = "redis")public interface RegisterCenter {...}@SimpleRpcSPI(value = "protostuff")public interface Serializer {...}@SimpleRpcSPI("round")public interface SimpleRpcLoadBalance {...}
接下来就是尝试去获取扩展加载器:
/*** 获取对应类型的扩展加载器实例** @param type 扩展类加载器的类型* @return 扩展类加载器实例*/public static <S> ExtensionLoader<S> getLoader(Class<S> type) {扩展类型必须是接口if (!type.isInterface()) {throw new IllegalStateException(type.getName() + " is not interface");}SimpleRpcSPI annotation = type.getAnnotation(SimpleRpcSPI.class);if (annotation == null) {throw new IllegalStateException(type.getName() + " has not @SimpleRpcSPI annotation.");}ExtensionLoader<?> extensionLoader = extensionLoaderCache.get(type);if (extensionLoader != null) {noinspection uncheckedreturn (ExtensionLoader<S>) extensionLoader;}extensionLoader = new ExtensionLoader<>(type);extensionLoaderCache.putIfAbsent(type, extensionLoader);noinspection uncheckedreturn (ExtensionLoader<S>) extensionLoader;}
扩展类加载器这里定义的是static缓存,全局共用,先做一些前置的拦截校验,判断是否是@SimpleRpcSPI
标记的类,是的话继续往下走;尝试从缓存中拿数据,如果不存在则返回;缓存可以防止频繁创建加载器;比如加载Serializer
的SPI,会尝试从缓存中拿出来:
ExtensionLoader.getLoader(Serializer.class).getExtension("json");
有了对应类型的SPI加载器,接下来就是加载对应的SPI的实现类,简单看下逻辑:
/*** 根据名字获取扩展类实例(单例)** @param name 扩展类在配置文件中配置的名字. 如果名字是空的或者空白的,则返回默认扩展* @return 单例扩展类实例,如果找不到,则抛出异常*/public T getExtension(String name) {if (StrUtil.isBlank(name)) {return getDefaultExtension();}从缓存中获取单例T extension = extensionsCache.get(name);if (extension == null) {Object lock = createExtensionLockMap.computeIfAbsent(name, k -> new Object());noinspection SynchronizationOnLocalVariableOrMethodParametersynchronized (lock) {extension = extensionsCache.get(name);if (extension == null) {extension = createExtension(name);extensionsCache.put(name, extension);}}}return extension;}
这里有几点:
•如果是未指定扩展名,那么选用默认的扩展类,这里是有simple-rpc-core自己提供实现;•这里同样使用缓存,尝试从指定类型的扩展类缓存中拿到对应类型的扩展类,如果不存在的话加锁尝试再次获取;获取不到则创建新的实例;此处返回的是单例;
然后就是创建实例的过程,也比较简单:
private T createExtension(String name) {获取当前类型所有扩展类Map<String, Class<?>> extensionClasses = getAllExtensionClasses();再根据名字找到对应的扩展类Class<?> clazz = extensionClasses.get(name);if (clazz == null) { throw new ...}...return (T) clazz.newInstance();...}private Map<String, Class<?>> getAllExtensionClasses() {Map<String, Class<?>> extensionClasses = extensionClassesCache.get();if (extensionClasses != null) {return extensionClasses;}synchronized (extensionClassesCache) {extensionClasses = extensionClassesCache.get();if (extensionClasses == null) {extensionClasses = loadClassesFromResources();extensionClassesCache.set(extensionClasses);}}return extensionClasses;}private Map<String, Class<?>> loadClassesFromResources() {Map<String, Class<?>> extensionClasses = new ConcurrentHashMap<>();扩展配置文件名String fileName = EXTENSION_PATH + type.getName();拿到资源文件夹ClassLoader classLoader = ExtensionLoader.class.getClassLoader();try {Enumeration<URL> resources = classLoader.getResources(fileName);while (resources.hasMoreElements()) {URL url = resources.nextElement();try (BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), StandardCharsets.UTF_8))) {开始读文件while (true) {String line = reader.readLine();if (line == null) {break;}parseLine(line, extensionClasses);}}}...return extensionClasses;}private void parseLine(String line, Map<String, Class<?>> extensionClasses) throws ClassNotFoundException {line = line.trim();忽略#号开头的注释if (line.startsWith("#")) {return;}String[] kv = line.split("=");if (kv.length != 2 || kv[0].length() == 0 || kv[1].length() == 0) {throw new IllegalStateException("Extension file parsing error. Invalid format!");}if (extensionClasses.containsKey(kv[0])) {throw new IllegalStateException(kv[0] + " is already exists!");}Class<?> clazz = ExtensionLoader.class.getClassLoader().loadClass(kv[1]);extensionClasses.put(kv[0], clazz);}
调用链路:createExtension
→ getAllExtensionClasses
→ loadClassesFromResources
→ parseLine
;
四个方法简单分析下:
•createExtension
:这里面会尝试从缓存extensionClassesCache
里面拿到Class,拿到之后创建对应的实例;•getAllExtensionClasses
:就是将获取的数据塞到缓存里面;•loadClassesFromResources
:这里的话是从资源文件里面去获取对应的数据,然后逐行解析;这里的文件名就是对应的SPI接口名,比如是序列化的,那么文件路径就是:META-INF/simple-rpc/com.simple.rpc.common.interfaces.Serializer
;•parseLine
:这个就是json=com.simple.rpc.test.core.common.test.spi.JSONSerializer
这段解析出来,然后value存到缓存里面即可;
SPI测试
测试代码在com.simple.rpc.test.ExtensionLoaderTest中:
@Testpublic void testSerialize() {System.out.println(ExtensionLoader.getLoader(Serializer.class).getExtension("json"));System.out.println(ExtensionLoader.getLoader(RegisterCenter.class).getExtension("mysql"));System.out.println(ExtensionLoader.getLoader(ConfigLoader.class).getExtension("spi-property"));}

一般SPI接口是需要放在公共的地方,全系统是保持一致的;
注册中心
上面也说了,本来是打算单独一个服务:simple-rpc-register,但是时间受限和考虑维度暂时没想好问题,所以目前是搁置了;之后思考一下在考虑单独拿出来,这里先讲讲注册相关的逻辑;
基本方法
注册中心不复杂,就是存生产者提供的服务的元信息:
@SimpleRpcSPI(value = "redis")public interface RegisterCenter {void init(SimpleRpcUrl url);Boolean register(RegisterInfo request);String get(RegisterInfo request);Boolean unregister(HookEntity hookEntity);}
•init
:初始化接口,就想初始化jedis连接、mysql连接等;提供其连接后的存储能力;•register
:注册信息,这里面就是一些接口的基本信息和服务端的连接信息,可以具体看看代码里面的实体字段;•get
:获取对应的信息,等会后面一起写下数据格式;其实就只在设计数据格式,然后存取的操作;•unregister
:这个就是注销所有的注册信息,就是删除对应的注册元数据信息;
数据格式
存储分为三个部分:
•key
:这里是接口权限定名+ "_" + 别名,com.simple.rpc.HelloService_helloService
;•host_port
:类似于子key,存储的是服务端url的信息,127.0.0.1_41200
;•registerInfo
:注册的元数据信息,出上面两个key信息之外,还会有一些属性配置;
/*** 将三个字段构建然后存入:* --- 127.0.0.1_41201 --- {"alias":"xxx","host":"127.0.0.1","port":41201,"serializer":"serializer","weights":20}* -* com.simple.rpc.HelloService_helloService --- 127.0.0.1_41202 --- {"alias":"xxx","host":"127.0.0.1","port":41202,"serializer":"serializer","weights":30}* -* --- 127.0.0.1_41203 --- {"alias":"xxx","host":"127.0.0.1","port":41203,"serializer":"serializer","weights":50}**/
接口实现
一些基本的前置构建进行了抽象:com.simple.rpc.core.register.AbstractRegisterCenter
注册接口
@Overridepublic Boolean register(RegisterInfo request) {String key = request.getInterfaceName() + SymbolConstant.UNDERLINE + request.getAlias();String fieldKey = request.getHost() + SymbolConstant.UNDERLINE + request.getPort();String value = JSON.toJSONString(request);return buildDataAndSave(key, fieldKey, value);}// 这里由子类取实现存操作protected abstract Boolean buildDataAndSave(String key, String hostPort, String request);
获取服务接口
@Overridepublic String get(RegisterInfo request) {String key = request.getInterfaceName() + SymbolConstant.UNDERLINE + request.getAlias();Map<String, String> stringStringMap = getLoadBalanceData(key);负载均衡策略String rule = Objects.isNull(request.getLoadBalanceRule()) ? LoadBalanceRule.ROUND.getName() : request.getLoadBalanceRule();return ExtensionLoader.getLoader(SimpleRpcLoadBalance.class).getExtension(rule).loadBalance(stringStringMap);}/*** 数据格式:{"127.0.0.1_41200" : "{"requestId: 1"}"}* 描述:通过 key(com.simple.rpc.AService_aService)获取的到 下面的map格式* map格式:前面的key以 host + "_" + port 组成;后面是对应的request信息的json格式** @param key* @return*/protected abstract Map<String, String> getLoadBalanceData(String key);
这里的话就是加入了负载算法,其他的也就是从存数据的地方根据制定的数据格式,拿到对应的注册信息;关于负载算法,之后可以单独说;
这里是支持SPI机制来实现不同的负载均衡的,当然,rpc-core里面也提供了多种负载算法;
工厂提供接口
public class RegisterCenterFactory {public static RegisterCenter create(String registerType) {return getRegisterCenter(registerType);}private static RegisterCenter getRegisterCenter(String registerType) {根据SPI找到对应的注册中心,其实这里是会根据address进行解析然后判断是哪种类型;return ExtensionLoader.getLoader(RegisterCenter.class).getExtension(registerType);}}
从这就可以拿到对应的注册中心,然后在进行其他操作,这里放到之后整合springboot的时候在结合起来看看;
注销服务信息
@Overridepublic Boolean unregister(HookEntity hookEntity) {List<String> rpcServiceNames = hookEntity.getRpcServiceNames();String fieldKey = hookEntity.getServerUrl() + SymbolConstant.UNDERLINE + hookEntity.getServerPort();AtomicReference<Long> hdel = new AtomicReference<>(0L);rpcServiceNames.forEach(name -> {hdel.set(jedis.hdel(name, fieldKey));});return hdel.get() > 0;}
这里是redis为注册中心的实现,就是删除信息;没什么可讲的;之后只需要挂在springboot容器退出的时候去调用即可;
抽象类里面的子实现这里就不把代码粘贴上来了;
总而言之,注册中心就是:**注册元信息
、获取注册元信息
、注销元信息
**;
配置中心
配置加载器
这里rpc提供自定义配置能力的地方;同样提供SPI机制:
@SimpleRpcSPI(value = "simple-rpc-property")public interface ConfigLoader {*** 加载配置项** @param key 配置的 key* @return 配置项的值,如果不存在,返回 null*/String loadConfigItem(String key);}
这是loader提供的能力,也就是通过key获取value;
系统提供了两个子实现:
•PropertiesConfigLoader
:setting = SettingUtil.get("simple-rpc.properties");
使用hutool的工具类从指定文件名里面拿到对应的配置信息;return setting.getStr(key);
•SystemPropertyLoader
:System.getProperty(key);
这个比较简单,就是直接拿系统相关的参数;
配置管理器
这里面的话就提供了文件加载、行数据加载、实体映射等;
构造器就是在做初始化操作:将目前系统提供的加载器放到缓存里面,然后在loadConfigItem
方法里面轮询获取,然后优先顺序是:system-property
> simple-rpc-property
> spi-property
;前者为空的情况下,会顺延;之后测试一下;
private ConfigManager() {按照优先级放好List<String> configLoaderNames = Arrays.asList("system-property", "simple-rpc-property", "spi-property");configLoaders = new ArrayList<>(configLoaderNames.size());for (String loaderName : configLoaderNames) {ConfigLoader configLoader = ExtensionLoader.getLoader(ConfigLoader.class).getExtension(loaderName);configLoaders.add(configLoader);}}/*** 获取配置项** @param key 配置项的 key* @return 如果获取不到,返回 null*/public String loadConfigItem(String key) {按照优先级,先获取到就返回for (ConfigLoader configLoader : configLoaders) {String value = configLoader.loadConfigItem(key);if (value != null) {return value;}}return null;}
下面代码做的是缓存操作,根据各个不同的配置类,返回不同的配置类加载器:
/*** 加载配置,有缓存** @param clazz 配置类型* @param <T> 类型* @return 配置实体类*/@SuppressWarnings("unchecked")public <T> T loadConfig(Class<T> clazz) {T config = (T) configCache.get(clazz);if (config == null) {config = loadAndCreateConfig(clazz);configCache.put(clazz, config);}return config;}
继续看看实体映射:
/*** 加载并创建配置类** @param clazz 类型 class* @param <T> 类型* @return 配置类,load 不到的字段为 null*/private <T> T loadAndCreateConfig(Class<T> clazz) {SimpleRpcConfig configAnnotation = clazz.getAnnotation(SimpleRpcConfig.class);if (configAnnotation == null) {throw new IllegalStateException("config class " + clazz.getName() + " must has @SimpleRpcConfig annotation");}String prefix = configAnnotation.prefix();if (StrUtil.isBlank(prefix)) {throw new IllegalArgumentException("config class " + clazz.getName() + "@SimpleRpcConfig annotation must has prefix");}try {T configObject = clazz.newInstance();for (Field field : clazz.getDeclaredFields()) {忽略掉静态的if (Modifier.isStatic(field.getModifiers())) {continue;}String configKey = prefix + "." + field.getName();String value = loadConfigItem(configKey);if (value == null) {continue;}Object convertedValue = Convert.convert(field.getType(), value);field.setAccessible(true);field.set(configObject, convertedValue);}return configObject;} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);}}
这里简单讲下几个操作:
•首先是拿到类上的@SimpleRpcConfig
注解,用来判断其是否是配置类;•拿到前缀信息,跟springboot里面的配置类使用方式其实差不多;•创建该配置类,然后遍历其所有的字段,通过String value = loadConfigItem(configKey);
获取从配置文件里面加载出来的key进行赋值;
整个逻辑就差不多是这样;
配置加载器测试
加载测试
在resources
资源目录下面新建文件simple-rpc.properties
:
simple.rpc.register.address=redis://127.0.0.1:6379simple.rpc.register.password=123456simple.rpc.base.loadBalanceRule=roundsimple.rpc.base.timeout=10000simple.rpc.base.retryNum=10
然后新建测试类:com.simple.rpc.test.ConfigLoaderTest
,然后测试注册信息的获取:
@Testpublic void getRegistryConfig() {RegistryConfig registryConfig = ConfigManager.getInstant().loadConfig(RegistryConfig.class);System.out.println(registryConfig);System.out.println(SimpleRpcUrl.toSimpleRpcUrl(registryConfig));}
可以看看结果:

目前是已经加载到了信息;
加载器的优先级
@Testpublic void getBaseConfig() {BaseConfig baseConfig = ConfigManager.getInstant().loadConfig(BaseConfig.class);System.out.println(baseConfig);}
看看结果:

前面代码里面也看到过,SPI
的优先级最低(这里先不关心SPI实现),所以最先加载,下面的输出就是加载RPC配置文件的结果,所以优先级就是如此,但是后面整合springboot的时候,几乎都是使用其默认的配置类,但是这个也可以作为一个扩展点,灵活性更高;
负载均衡
思路分析
simple-rpc的负载思路是:从注册中心区注册信息的那里进行处理,从多个url中选取一个,同样是抽象类里面做大部分逻辑:
public abstract class AbstractLoadBalance implements SimpleRpcLoadBalance {@Overridepublic String loadBalance(Map<String, String> services) {if (CollectionUtil.isEmpty(services)) {return null;}Map<String, LoadBalanceParam> selectMap = new ConcurrentHashMap<>(4);Set<String> urls = services.keySet();for (String url : urls) {Request request = JSON.parseObject(services.get(url), Request.class);LoadBalanceParam param = new LoadBalanceParam();param.setWeights(request.getWeights());selectMap.put(url, param);}String selectUrl = select(selectMap);return services.get(selectUrl);}*** 实际的负载算法** @param urls* @return*/public abstract String select(Map<String, LoadBalanceParam> urls);}
简单解析一下方法:
•这里是去构建一个key = 127.0.0.1_41201,value= new LoadBalanceParam;
的map,这里是想着之后可以扩展,一些负载算法需要依赖于一些值,都可以放到后面这个实体里面传给对应的SPI实现;•然后关于url的选择,则由具体的算法实现;下面就简单分析一下几种不同的负载策略;
负载算法分析
这里的话,简单的就不分析了,也就是round
、random
;
轮询权重算法
算法思路:就是根据权重拉长范围,然后根据随机数分配,想当于权重大的被随机到的概率也越大;
public class RandomWeightRule extends AbstractLoadBalance {/*** 权重:* - 127.0.0.1_8881: 20* - 127.0.0.1_8882: 30* - 127.0.0.1_8883: 50* <p>* 拉长范围:|0 -- 127.0.0.1_8881 -- 20 | 21 —- 127.0.0.1_8882 -- 50 | 51 -- 127.0.0.1_8883 -- 100|** @param urls* @return*/@Overridepublic String select(Map<String, LoadBalanceParam> urls) {int range = 0;for (LoadBalanceParam value : urls.values()) {range += value.getWeights();}int index = RandomUtil.randomInt(range);for (String url : urls.keySet()) {LoadBalanceParam param = urls.get(url);Integer weights = param.getWeights();if (index < weights) {return url;}index -= weights;}return "";}}
这里分析几个案例(注释里面的信息):
•随机数10,然后遍历到的127.0.0.1_8881: 20
,这个时候index(10) < weights(20)
符合直接返回;•随机数40,第一次遍历就是上面情况,不符合,然后计算index(40) -= weights(20)
这个时候开始第二轮,此时index = 40 - 20 = 20
,20是第一次url的权重,然后再次判断,index(20) < weights(30)
符合;证明轮询到了127.0.0.1_8882: 30
;•随机数70,根据上面两种情况,这里相减两次,最后符合条件已经是第三轮轮询,返回的则是127.0.0.1_8883: 50
普通的加权轮询算法
负载思路:其实这个也简单,就是求对应几个服务权重的最小公约数,然后根据用 权重 / 最小公约数 = 轮询次数;这里就是比较粗暴,现在前面的机器如果权重过大,可能大部分负载都压到该机器;如果权重是321这里的效果就是AAABBC
;
public class NormalWeightRoundRule extends AbstractLoadBalance {private static Map<String, Integer> urlMap = new ConcurrentHashMap<>(4);/*** 权重:* - 127.0.0.1_8881: 20* - 127.0.0.1_8882: 30* - 127.0.0.1_8883: 50* <p>* 找到最小公约数:转换成 url : 20 / 10 = 2 这种数据格式,然后遍历map,在减去对应的次数** @param urls* @return*/@Overridepublic String select(Map<String, LoadBalanceParam> urls) {List<Integer> weights = new ArrayList<>(10);for (String url : urls.keySet()) {weights.add(urls.get(url).getWeights());}int maxGys = ngcd(weights, weights.size());if (CollectionUtil.isEmpty(urlMap)) {// 将权重除以最小公约数urls.forEach((url, param) -> urlMap.put(url, param.getWeights() / maxGys));}for (String url : urlMap.keySet()) {Integer num = urlMap.get(url);if (num > 0) {urlMap.put(url, --num);return url;} else {urlMap.remove(url);if (CollectionUtil.isEmpty(urlMap)) {// 将权重除以最小公约数urls.forEach((url1, param) -> urlMap.put(url1, param.getWeights() / maxGys));ArrayList<String> urlFirst = new ArrayList<>(urlMap.keySet());Integer numFirst = urlMap.get(urlFirst.get(0));urlMap.put(urlFirst.get(0), --numFirst);return urlFirst.get(0);}}}return "";}public static int gcd(int x, int y) {return y == 0 ? x : gcd(y, x % y);}public static int ngcd(List<Integer> target, int z) {if (z == 1) {//真正返回的最大公约数return target.get(0);}//递归调用,两个数两个数的求return gcd(target.get(z - 1), ngcd(target, z - 1));}}
这里其实就两点:
•首先是通过递归求出权重的最小公约数•拿到最小公约数后开始构建 key = url,value = 次数
的map,然后用于遍历,每次数量减1,全部轮询结束后重置缓存;
平滑加权轮询
负载思路:这个算法的话,其实是保证在一个周期内服务根据权重被轮询到的总次数是一致的,但是在周期内是随机的,跟上面一样,如果权重是321,那么这里的效果就是ABACBA
;
public class SmoothWeightRoundRule extends AbstractLoadBalance {private static Map<String, ServiceWeight> weightPathMap = new ConcurrentHashMap<>(4);/*** 权重:* - 127.0.0.1_8881: 3* - 127.0.0.1_8882: 2* - 127.0.0.1_8883: 1* <p>* |------------|---------------|------------|----------------|--------------------------------|* | requestNum | currentWeight | currentMax | url | max(currentWeight)-totalWeight |* |------------|---------------|------------|----------------|--------------------------------|* | 1 | 3,2,1 | 3 | 127.0.0.1_8881 | -3,2,1 |* |------------|---------------|------------|----------------|--------------------------------|* | 2 | 0,2,1 | 2 | 127.0.0.1_8882 | 0,-4,1 |* |------------|---------------|------------|----------------|--------------------------------|* | 3 | 3,-2,1 | 3 | 127.0.0.1_8881 | -3,-2,1 |* |------------|---------------|------------|----------------|--------------------------------|* | 4 | 0,0,2 | 2 | 127.0.0.1_8883 | 0,0,-5 |* |------------|---------------|------------|----------------|--------------------------------|* | 5 | 3,2,-4 | 3 | 127.0.0.1_8881 | -3,2,-4 |* |------------|---------------|------------|----------------|--------------------------------|* | 6 | 0,4,-3 | 4 | 127.0.0.1_8882 | 0,-2,-3 |* |------------|---------------|------------|----------------|--------------------------------|* <p>* - 每次从当前权重中选出最大权重代表的服务器作为返回结果* - 选择完具体服务器后,把当前最大权重减去权重总和,再把所有权重都跟初始权重(3,2,1)相加* - 3,2,1,选出3代表的127.0.0.1_8881服务器,然后减去6成为 -3,2,1,再加上3,2,1,成为0,2,1,下次的当前权重即为0,2,1** @param urls* @return*/@Overridepublic String select(Map<String, LoadBalanceParam> urls) {return getServerPath(urls);}/*** 每个url对应一个权重** @param urls* @return*/public static String getServerPath(Map<String, LoadBalanceParam> urls) {// 拿到总权重int range = 0;for (LoadBalanceParam value : urls.values()) {range += value.getWeights();}// 动态权重Map为空,那么初始化端口权重mapif (weightPathMap.isEmpty()) {urls.forEach((url, param) -> weightPathMap.put(url, new ServiceWeight(url, param.getWeights(), 0)));}// 当前权重 + 初始权重 = 下一轮的权重for (ServiceWeight serviceWeight : weightPathMap.values()) {serviceWeight.curWeight += serviceWeight.weight;}// 将最大的权重赋值ServiceWeight maxCurWeight = null;for (ServiceWeight weight : weightPathMap.values()) {// 找最大的权重值if (maxCurWeight == null || weight.curWeight > maxCurWeight.curWeight) {maxCurWeight = weight;}}// 减去总权重assert maxCurWeight != null;maxCurWeight.curWeight -= range;// 拿到端口return maxCurWeight.url;}}
大致就是下面几个步骤:
•每次从当前权重中选出最大权重代表的服务器作为返回结果•选择完具体服务器后,把当前最大权重减去权重总和,再把所有权重都跟初始权重(3,2,1)相加•3,2,1,选出3代表的127.0.0.1_8881服务器,然后减去6成为 -3,2,1,再加上3,2,1,成为0,2,1,下次的当前权重即为0,2,1
测试的话代码里面有,这就不粘贴上来了,然后负载均衡也是支持SPI机制的,作用于从注册中心获取注册信息的时候,会根据url进行选取;
压缩、序列化
这里的话,压缩是默认没有使用的,也就是直接依赖于序列化工具,但是rpc-core里面有实现GZIP
;序列化默认是使用protostuff
,这个序列化性能还是不错的;
这里简单提下,直接实现对应的接口,就可以通过SPI机制加载指定的压缩工具或者序列化工具:
压缩
@SimpleRpcSPI(value = "default")public interface Compressor {byte[] compress(byte[] bytes);byte[] decompress(byte[] bytes);}
序列化
@SimpleRpcSPI(value = "protostuff")public interface Serializer {byte[] serialize(Object object);<T> T deserialize(byte[] bytes, Class<T> clazz);}
在simple-rpc-test-common
模块里面有对应的SPI实现:com.simple.rpc.test.common.starter.spi.JSONSerializer
RPC协议和通讯
这里大部分都算是netty的东西,先从服务端、客户端的启动说说,然后在到代理反射,最后就是双端通讯过程的编解码和handler处理;
netty双端
服务端和客户端的设计理念都是实现Runnable
,然后核心就在run方法里面,先说说client端的启动:
public RpcServerSocket(Request request) {this.request = request;}
这里构建request,里面包含心跳时间,服务端的host和port,然后就是启动client;run方法里面的核心内容:
// 选用客户端的启动器Bootstrap b = new Bootstrap();b.group(workerGroup);b.channel(NioSocketChannel.class);b.option(ChannelOption.AUTO_READ, true);b.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);b.option(ChannelOption.TCP_NODELAY, Boolean.TRUE);b.handler(new ChannelInitializer<SocketChannel>() {...});// 进行服务器连接ChannelFuture f = b.connect(host, port).sync();this.future = f;f.channel().closeFuture().sync();
关于那些参数什么意思,等下说server端的时候再讲,这里面比较重要的一个地方this.future = f;
这里的话在调用client启动的时候,会轮询去判断channel是否已经创建,这里也就是client端的重试次数,这里也仅仅用来判断连接是否成功,然后每次会讲此次的channelFuture
缓存起来;
服务端的话,直接看代码:
@Overridepublic void run() {Long stopConnectTime = Objects.isNull(request.getStopConnectTime()) || request.getStopConnectTime() <= 0 ?30 : request.getStopConnectTime();// 这里如果自己不指定线程数,默认是当前cpu的两倍EventLoopGroup dealConnGroup = new NioEventLoopGroup(128);EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(dealConnGroup, workerGroup).channel(NioServerSocketChannel.class)// 系统用于临时存放已完成三次握手的请求的队列的最大长度。如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数.option(ChannelOption.SO_BACKLOG, 128)// 程序进程非正常退出,内核需要一定的时间才能够释放此端口,不设置 SO_REUSEADDR 就无法正常使用该端口。.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)// TCP/IP协议中针对TCP默认开启了Nagle 算法。// Nagle 算法通过减少需要传输的数据包,来优化网络。在内核实现中,数据包的发送和接受会先做缓存,分别对应于写缓存和读缓存。// 启动 TCP_NODELAY,就意味着禁用了 Nagle 算法,允许小包的发送。// 对于延时敏感型,同时数据传输量比较小的应用,开启TCP_NODELAY选项无疑是一个正确的选择.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {...});// 默认启动初始端口int port = Objects.isNull(request.getPort()) || request.getPort() <= 0 ? 41200 : request.getPort();while (NetUtil.isPortUsing(port)) {port++;}LocalAddressInfo.LOCAL_HOST = NetUtil.getHost();LocalAddressInfo.PORT = port;//注册服务this.f = bootstrap.bind(port).sync();// 返回请求等待结果,异步监听事件this.f.channel().closeFuture().sync();...}
服务端也很简单,上面注释上面对各个参数都做了很详细的解释,这里会用的LocalAddressInfo
对象缓存服务端的启动信息,端口由自己定义,通过配置传递进来;
所以这里也比较简单,然后只需要开个线程调用即可;
通讯和RPC协议
编解码
这里先说下自定义的编解码协议,在使用handler的时候会走自己定义的编解码协议,首先看看编码:首先是RpcMessageEncoder extends MessageToByteEncoder<RpcMessage>
:
public class RpcMessageEncoder extends MessageToByteEncoder<RpcMessage> {@Overrideprotected void encode(ChannelHandlerContext ctx, RpcMessage rpcMessage, ByteBuf out) {// 2B magic code(魔数)out.writeBytes(MessageFormatConstant.MAGIC);// 1B version(版本)out.writeByte(MessageFormatConstant.VERSION);// 4B full length(消息长度). 总长度先空着,后面填。out.writerIndex(out.writerIndex() + MessageFormatConstant.FULL_LENGTH_LENGTH);// 1B messageType(消息类型)out.writeByte(rpcMessage.getMessageType());// 1B codec(序列化类型)out.writeByte(rpcMessage.getSerializeType());// 1B compress(压缩类型)out.writeByte(rpcMessage.getCompressTye());// 8B requestId(请求的Id)out.writeLong(rpcMessage.getRequestId());// 写 body,返回 body 长度int bodyLength = writeBody(rpcMessage, out);// 当前写指针int writerIndex = out.writerIndex();out.writerIndex(MessageFormatConstant.MAGIC_LENGTH + MessageFormatConstant.VERSION_LENGTH);// 4B full length(消息长度)out.writeInt(MessageFormatConstant.HEADER_LENGTH + bodyLength);// 写指针复原out.writerIndex(writerIndex);}}
这里就是按照指定的格式进行编码:
•2B magic
(魔数)•1B version
(版本)•4B full length
(消息长度)•1B messageType
(消息类型)•1B serialize
(序列化类型)•1B compress
(压缩类型)•8B requestId
(请求的Id)•body
(object类型数据)
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18+-----+-----+-------+----+----+----+----+-----------+---------+--------+----+----+----+----+----+----+----+---+| magic |version| full length |messageType|serialize|compress| RequestId |+-----+-----+-------+----+----+----+----+-----------+----- ---+--------+----+----+----+----+----+----+----+---+| || body || || ... ... |+-------------------------------------------------------------------------------------------------------------+
这里的消息长度开始只是改变指针的位置,后面写body之后会计算字节数,在设置此次的消息总长度;
然后就是writeBody
方法:
private int writeBody(RpcMessage rpcMessage, ByteBuf out) {byte messageType = rpcMessage.getMessageType();// 如果是 ping、pong 心跳类型的,没有 body,直接返回头部长度if (messageType == MessageType.HEARTBEAT.getValue()) {return 0;}// 序列化器SerializeType serializeType = SerializeType.fromValue(rpcMessage.getSerializeType());if (serializeType == null) {throw new IllegalArgumentException("codec type not found");}Serializer serializer = ExtensionLoader.getLoader(Serializer.class).getExtension(serializeType.getName());// 压缩器CompressType compressType = CompressType.fromValue(rpcMessage.getCompressTye());if (compressType == null) {throw new IllegalArgumentException("compress type not found");}Compressor compressor = ExtensionLoader.getLoader(Compressor.class).getExtension(compressType.getName());// 序列化byte[] notCompressBytes = serializer.serialize(rpcMessage.getData());// 压缩byte[] compressedBytes = compressor.compress(notCompressBytes);// 写 bodyout.writeBytes(compressedBytes);return compressedBytes.length;}
简单解析下:
•首先是判断是否是心跳消息,如果是的话,就直接跳过,不写body体;•然后根据传进来的序列化、压缩类型在通过SPI机制进行加载,然后先进行序列化,在进行压缩操作,这里就返回压缩后的字节数;
编码就是这些,继续看看解码操作;
public class RpcMessageDecoder extends LengthFieldBasedFrameDecoder {public RpcMessageDecoder() {/*** 继承定长解码器,自己完成构造数据后,每次会将一次请求的消息一起解析,不会出现粘包、拆包* 这里的话假设数据是(二进制不补零):52 1 0001 1 1 00000001 1* maxFrameLength: 8M* lengthFieldOffset:3* lengthFieldLength:4* lengthAdjustment:-7* initialBytesToStrip:0** 这里解析比较简单 首先是长度偏移量在 3字节后面,也就是跳过了 52 1;* 然后长度为 0001,对应4个字节;* 接下来就是调整消息读取位置,又到最前面去了,然后读取的初始位置就是0,从头还是读,所以52 1 0001 1 1 00000001 1都读取了*/super(// 最大的长度,如果超过,会直接丢弃MAX_FRAME_LENGTH,// 描述长度的字段[4B full length(消息长度)]在哪个位置:在 [2B magic(魔数)]、[1B version(版本)] 后面MAGIC_LENGTH + VERSION_LENGTH,// 描述长度的字段[4B full length(消息长度)]本身的长度,也就是 4B 啦FULL_LENGTH_LENGTH,// LengthFieldBasedFrameDecoder 拿到消息长度之后,还会加上 [4B full length(消息长度)] 字段前面的长度// 因为我们的消息长度包含了这部分了,所以需要减回去-(MAGIC_LENGTH + VERSION_LENGTH + FULL_LENGTH_LENGTH),// initialBytesToStrip: 去除哪个位置前面的数据。因为我们还需要检测 魔数 和 版本号,所以不能去除0);}@Overrideprotected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {Object decoded = super.decode(ctx, in);if (decoded instanceof ByteBuf) {ByteBuf frame = (ByteBuf) decoded;if (frame.readableBytes() >= HEADER_LENGTH) {try {return decodeFrame(frame);} catch (Exception ex) {SimpleRpcLog.error("Decode frame error.", ex);} finally {frame.release();}}}return decoded;}}
这里就涉及到了netty的定长解码器,这里的话不多说,注释里面写的挺详细的;可以看看文章:
https://zhuanlan.zhihu.com/p/95621344
https://www.cnblogs.com/crazymakercircle/p/10294745.html
继承了定长解码器,我们只需要关心我们每次接收的信息了,从版本开始解析,然后会做个消息的长度校验操作:frame.readableBytes() >= HEADER_LENGTH
;接下来就是解码:decodeFrame(frame);
readAndCheckMagic(in);readAndCheckVersion(in);int fullLength = in.readInt();byte messageType = in.readByte();byte codec = in.readByte();byte compress = in.readByte();long requestId = in.readLong();...// 是心跳则直接返回if (messageType == MessageType.HEARTBEAT.getValue()) {return rpcMessage;}// 计算读取长度int bodyLength = fullLength - HEADER_LENGTH;if (bodyLength == 0) {return rpcMessage;}// 读取数据byte[] bodyBytes = new byte[bodyLength];in.readBytes(bodyBytes);
这里的话粘贴了核心代码,拿到了消息体,后面就是判断序列化、解压类型了,然后就是逆流程了;
整个编解码协议就是这些了;
通讯工具类
就4个类,完成消息的发送:
├── send│ ├── SyncWrite.java│ ├── SyncWriteFuture.java│ ├── SyncWriteMap.java│ └── WriteFuture.java
•WriteFuture
:这个类是一个异步接口,继承于java并发包里面的Future
;除了其自带的一些方法,扩展了是否写成功、设置写结果、设置响应值、此次请求id等方法;•SyncWriteMap
:这个其实就是个缓存,讲此次请求产生的异步类WriteFuture
缓存起来,用请求id作为key,每次请求结束后删除此次请求操作;•SyncWriteFuture
:这个类其实可以看看,也就是WriteFuture
的实现类;
•是否超时,这里根据isTimeout
方法进行判断;•get
获取响应的时候,会去尝试拿到锁;
private CountDownLatch latch = new CountDownLatch(1);if (latch.await(timeout, unit)) {return response;}
上面会发生锁等待,然后在处理完结果后,写回响应的时候,会去释放锁,然后就能获取到响应值
public void setResponse(Response response) {this.response = response;// 设置好了响应值之后,这里释放锁latch.countDown();}
然后其他的就没什么了,都是一些基本的方法;
•
SyncWrite
:这个方法里面就是封装了一层,其实底层就是channel.writeAndFlush
写消息操作;这里大致讲下其流程;
可以看看代码来说:
public Response writeAndSync(Channel channel, Request request, Long timeout) throws Exception {if (channel == null) {throw new NettyInitException("channel is null, please init channel");}if (request == null) {throw new NullPointerException("request");}if (timeout <= 0) {timeout = 30L;}long requestId = MessageFormatConstant.REQUEST_ID.incrementAndGet();request.setRequestId(requestId);// 记录此次请求id,并放入到缓存中WriteFuture<Response> future = new SyncWriteFuture(request.getRequestId());SyncWriteMap.syncKey.put(request.getRequestId(), future);// 构建请求数据RpcMessage rpcMessage = new RpcMessage();rpcMessage.setMessageType(MessageType.REQUEST.getValue());rpcMessage.setRequestId(requestId);byte serializer = SerializeType.fromName(request.getSerializer()).getValue();rpcMessage.setSerializeType(serializer);byte compressor = CompressType.fromName(request.getCompressor()).getValue();rpcMessage.setCompressTye(compressor);rpcMessage.setData(request);// 同步写数据Response response = doWriteAndSync(channel, rpcMessage, timeout, future);// 拿到响应值后,此前请求结束,那么可以移除此次请求SyncWriteMap.syncKey.remove(request.getRequestId());return response;}
这里大致分为几个步骤:- 参数校验- 通过原子自增获取请求id,然后构建此次请求的`WriteFuture`,并存入到缓存中;- 然后就是构建`RpcMessage`,这个是Rpc通信协议的实体,这里构建了`序列化类型`、`压缩类型`、`请求id`等,然后将请求数据写入,然后标记此次消息类型为`请求类型`;- 封装的写操作:`doWriteAndSync`,真正发送在这里,假设发送成功则会得到结果,然后将此次请求缓存的数据清楚;然后看看`doWriteAndSync`方法:
private Response doWriteAndSync(final Channel channel, final RpcMessage rpcMessage, final long timeout, final WriteFuture<Response> writeFuture) throws Exception {// 这里就不用lambda了,这里就是在channel写出一条数据之后,可以为其添加一个监听时间,也即操作完之后的一个回调方法// 每个 Netty 的出站 I/O 操作都将返回一个 ChannelFuturechannel.writeAndFlush(rpcMessage).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) {// 设置此次请求的状态writeFuture.setWriteResult(future.isSuccess());// 如果失败,此次的原因writeFuture.setCause(future.cause());// 失败移除此次请求if (!writeFuture.isWriteSuccess()) {SyncWriteMap.syncKey.remove(writeFuture.requestId());}}});// 请求完成之后,这里会去模拟等待,get的时候是无法去拿到资源的,这里设置一个等待事件Response response = writeFuture.get(timeout, TimeUnit.SECONDS);if (response == null) {// 已经超时则抛出异常if (writeFuture.isTimeout()) {throw new TimeoutException();...}// 否则返回响应,此次类似 feign的调用,等到请求,过了一段时间还没有拿到结果,则抛出超时异常,否则成功return response;}
这里同样简单分析一下:- 首先是真正的写操作`channel.writeAndFlush(rpcMessage)`;然后会为写操作做个监听,设置一些状态信息;- 然后就是等待获取请求的结果,这里get等待;这里需要结合两个handler看,client-handler里面获取服务端的响应后会去设置,下面跟着心跳检测看看两个handler的内容;
双handler和心跳
看看心跳处理,在启动双端的时候:
client的处理:
b.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(// 设定 IdleStateHandler 心跳检测每 5 秒进行一次写检测// write()方法超过 5 秒没调用,就调用 userEventTriggernew IdleStateHandler(0, beatTime, 0, TimeUnit.SECONDS),new RpcMessageDecoder(),new RpcMessageEncoder(),new ClientSocketHandler());}});
client-handler里面:
public class ClientSocketHandler extends SimpleChannelInboundHandler<RpcMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcMessage rpcMessage) throws Exception {// 拿到响应值Response msg = (Response) rpcMessage.getData();long requestId = rpcMessage.getRequestId();// 拿到此次请求的id,对应的缓存信息SyncWriteFuture future = (SyncWriteFuture) SyncWriteMap.syncKey.get(requestId);// 这里拿到了结果,就设置响应值if (future != null) {future.setResponse(msg);}}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {// 心跳IdleState state = ((IdleStateEvent) evt).state();if (state == IdleState.WRITER_IDLE) {SimpleRpcLog.info("write idle happen [{}]", ctx.channel().remoteAddress());Channel channel = ctx.channel();RpcMessage rpcMessage = new RpcMessage();rpcMessage.setSerializeType(SerializeType.PROTOSTUFF.getValue());rpcMessage.setCompressTye(CompressType.GZIP.getValue());rpcMessage.setMessageType(MessageType.HEARTBEAT.getValue());channel.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);}} else {super.userEventTriggered(ctx, evt);}}}
•new IdleStateHandler
:这里启动的时候进行心跳连接,根据传进来的心跳时间进行判断,如果指定时间内没有写操作,则调用userEventTriggered
;这里会去发送心跳消息维持连接;•使用自定义的编解码,这里前面单独讲了;•客户端的channelRead0
里面会设置响应,然后释放锁,同时上面讲的工具类里面的get
方法就能拿到对应的结果,最终返回;
server的处理:
.childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ch.pipeline().addLast(// 30 秒之内没有收到客户端请求的话就关闭连接new IdleStateHandler(stopConnectTime, 0, 0, TimeUnit.SECONDS),new RpcMessageDecoder(),new RpcMessageEncoder(),new ServerSocketHandler());}});
server-handler的处理:
public class ServerSocketHandler extends SimpleChannelInboundHandler<RpcMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {try {// 不理心跳消息if (rpcMessage.getMessageType() != MessageType.REQUEST.getValue()) {return;}// 拿到请求参数Request msg = (Request) rpcMessage.getData();//调用Class<?> classType = ClassLoaderUtils.forName(msg.getInterfaceName());Method addMethod = classType.getMethod(msg.getMethodName(), msg.getParamTypes());// 从缓存中里面获取bean信息Object objectBean = SimpleRpcServiceCache.getService(msg.getAlias());// 进行反射调用Object result = addMethod.invoke(objectBean, msg.getArgs());//反馈Response response = new Response();response.setRequestId(rpcMessage.getRequestId());response.setResult(result);// 构建返回值RpcMessage responseRpcMsg = RpcMessage.copy(rpcMessage);responseRpcMsg.setMessageType(MessageType.RESPONSE.getValue());responseRpcMsg.setData(response);ctx.writeAndFlush(responseRpcMsg);//释放ReferenceCountUtil.release(msg);} catch (Exception e) {e.printStackTrace();}}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {// 处理空闲状态的if (evt instanceof IdleStateEvent) {IdleState state = ((IdleStateEvent) evt).state();if (state == IdleState.READER_IDLE) {SimpleRpcLog.info("idle check happen, so close the connection");ctx.close();}} else {super.userEventTriggered(ctx, evt);}}}
•服务端维持心跳的原理是stopConnectTime
对应时间内没有读到数据,那么会跟客户端断开连接,这里的话是可以进行配置的,如果没有读区到,在server-handler里面的userEventTriggered
会去断开连接ctx.close()
;•channelRead0
里面做的事情就是拿到请求参数,然后从服务缓存中拿到bean,然后进行反射调用,拿到调用结果之后,在写出;这里就不需要做等待处理,直接入站经过client的handler;
代理整合发送功能
对外提供一个代理类RpcProxy
,一般服务整合的过程中,需要用到指定接口的,可以在bean注入的时候选择用代理创建:
public class RpcProxy {public static <T> T invoke(Class<T> interfaceClass, CommonConfig config) {InvocationHandler handler = new RpcInvocationHandler(config);ClassLoader classLoader = ClassLoaderUtils.getCurrentClassLoader();T result = (T) Proxy.newProxyInstance(classLoader, new Class[]{interfaceClass}, handler);return result;}}
这里面的CommonConfig
是一些公共配置,主要的处理逻辑是在RpcInvocationHandler
里面,然后里面主要就两个invoke
和connect
private void connect(Request request) {channelFuture = ConnectCache.getChannelFuture(request);if (this.channelFuture != null && this.channelFuture.channel().isOpen()) {return;}synchronized (this) {if (this.channelFuture != null && this.channelFuture.channel().isOpen()) {return;}//获取通信channelif (null == this.channelFuture) {RpcClientSocket clientSocket = new RpcClientSocket(request);executorService.submit(clientSocket);int tryNum = Objects.isNull(request.getRetryNum()) || request.getRetryNum() <= 0 ? 100 : request.getRetryNum();for (int i = 0; i < tryNum; i++) {if (null != this.channelFuture) {break;}try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}this.channelFuture = clientSocket.getFuture();}}if (null == this.channelFuture) {throw new NettyInitException("客户端未连接上服务端,考虑增加重试次数");}request.setChannelFuture(channelFuture);ConnectCache.saveChannelFuture(request);}}
这里有几点:
•首先尝试从连接缓存中拿到对应的连接,如果存在且通道是开启的,那么不需要重新连接;•反之需要从注册中心拿到对应服务器的信息,进行连接,并保存此次连接;•这里存在一个配置,因为客户端连接服务端是异步连接操作,这里会根据channelFuture
是否返回来判断是否连接成功,所以这里tryNum
来重试判断是否连接成功;
然后就是invoke
方法:
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// 连接客户端connect(buildRequest());String methodName = method.getName();Class[] paramTypes = method.getParameterTypes();// 排除Object的方法调用if (JavaKeywordConstant.TO_STRING.equals(methodName) && paramTypes.length == 0) {return this.toString();} else if (JavaKeywordConstant.HASHCODE.equals(methodName) && paramTypes.length == 0) {return this.hashCode();} else if (JavaKeywordConstant.EQUALS.equals(methodName) && paramTypes.length == 1) {return this.equals(args[0]);}Request request = new Request();ConsumerConfig consumerConfig = commonConfig.getConsumerConfig();BaseConfig baseConfig = commonConfig.getBaseConfig();//设置参数request.setMethodName(methodName);request.setParamTypes(paramTypes);request.setArgs(args);request.setBeanName(consumerConfig.getBeanName());request.setInterfaceName(consumerConfig.getInterfaceName());request.setChannel(channelFuture.channel());request.setAlias(consumerConfig.getAlias());request.setSerializer(baseConfig.getSerializer());request.setRegister(baseConfig.getRegister());request.setCompressor(baseConfig.getCompressor());request.setTimeout(baseConfig.getTimeout());// 发送请求Response response = null;try {response = new SyncWrite().writeAndSync(request.getChannel(), request,Objects.isNull(request.getTimeout()) ? 30L : request.getTimeout());} catch (Exception e) {e.printStackTrace();SimpleRpcLog.error(e.getMessage());}//异步调用return response.getResult();}
•首先是建立连接操作;•然后是排除Object的方法调用;•然后就是构建参数进行写操作:new SyncWrite().writeAndSync
;拿到response
则返回结果,完成此次的调用;
rpc协议和通讯大致就是这些东西,然后就是整合操作;




