暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

手写RPC-整合Spring、SpringBoot

爱搞技术的吴同学 2022-10-14
525

光有core也可以进行rpc调用,但目前的话还是需要整合到主流的框架中,目前我只会spring相关的,所以接下来就是整合spring和springboot的操作;其实就是注解和非注解的形式;

整合spring

其实这里主要偏重于spring一些功能讲解,因为用了xml处理器,我觉得挺有用的,所以这里也放上来讲下;但是spring后面我没有更新了,很多代码应该是没有很完善的,有很多优化的地方;项目名称是:spring-simple-rpc

项目目录

其实不是很复杂:

    .
    └── spring
    ├── RpcNamespaceXmlHandler.java
    ├── RpcXmlBeanDefinitionParser.java
    ├── annotation
    │ └── Enable


    SimpleRpc.java
    ├── beans
    │ ├── ConsumerBean.java
    │ ├── ProviderBean.java
    │ ├── ServerBean.java
    │ └── parser
    │ └── ParseServerBean.java
    ├── exception
    │ └── BeanNotFoundException.java
    └── transfer
    ├── BaseData.java
    └── DataMap.java


    resources
    └── META-INF
    ├── rpc.xsd
    ├── spring.handlers
    └── spring.schemas




    下面就稍微看看;

    接口详解

    这里的话主要就是几个bean重要:ServerBean
    ProviderBean
    ConsumerBean
    ;分别对应服务端启动,生产者提供服务,消费者消费服务;

    ServerBean

      public class ServerBean extends ServerConfig implements ApplicationContextAware {


      ExecutorService executorService = Executors.newFixedThreadPool(10);


      @Override
      public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
      SimpleRpcUrl simpleRpcUrl = ParseServerBean.parse(this);
      DataMap.DATA_TRANSFER_MAP.put(DataMap.DATA_TRANSFER, simpleRpcUrl);
      启动注册中心
      RegisterCenterFactory.create(simpleRpcUrl.getType()).init(simpleRpcUrl);
      SimpleRpcLog.info("注册中心初始化:{}", address);
      初始化服务端
      RpcServerSocket serverSocket = new RpcServerSocket(new Request());
      executorService.submit(serverSocket);
      while (!serverSocket.isActiveSocketServer()) {
      try {
      Thread.sleep(500);
      } catch (InterruptedException ignore) {
      }
      }
      SimpleRpcLog.info("初始化生产端服务完成 {} {}", LocalAddressInfo.LOCAL_HOST, LocalAddressInfo.PORT);
      }
      }

      这里就是实现ApplicationContextAware
      ,然后在setApplicationContext
      做处理,然后ServerConfig
      这里类似于springboot的propreties的操作,这里使用xml进行处理,这里就可以先理解为我们可以拿到ServerConfig
      里面的值了;

      setApplicationContext
      里面就是解析配置参数address
      ,然后初始化注册中心,然后就是启动服务端,这里的new Request()
      理论上是需要构建的,但是这里spring已经很久没改了,就不构建了;

      ProviderBean

        public class ProviderBean extends ProviderConfig implements ApplicationContextAware {


        @Resource
        private ServerBean serverBean;


        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        SimpleRpcUrl simpleRpcUrl = ParseServerBean.parse(serverBean);
        RegisterCenter registerCenter = RegisterCenterFactory.create(simpleRpcUrl.getType());
        if (Objects.isNull(registerCenter)) {
        throw new BeanNotFoundException("注册中心未初始化");
        }
        Request providerRequest = new Request();
        providerRequest.setInterfaceName(interfaceName);
        providerRequest.setBeanName(beanName);
        providerRequest.setAlias(alias);
        providerRequest.setHost(LocalAddressInfo.LOCAL_HOST);
        providerRequest.setPort(LocalAddressInfo.PORT);
        注册生产者
        boolean flag = registerCenter.register(Request.request2Register(providerRequest));
        SimpleRpcServiceCache.addService(alias, applicationContext.getBean(beanName));
        SimpleRpcLog.info("注册生产者:{}, 是否成功:{} ", JSON.toJSONString(providerRequest), flag);
        }
        }

        这里跟上面切入点是一样的,但是这里有点就是private ServerBean serverBean;
        先把这个bean注入,这里是为了保证bean的注入先后顺序;

        然后再合理就是把接口信息注册到注册中心,然后将服务存到缓存里面,服务提供者做这些就够了;

        ConsumerBean

          public class ConsumerBean<T> extends ConsumerConfig implements FactoryBean<T> {


          @Resource
          private ServerBean serverBean;


          @Override
          public T getObject() throws BeanNotFoundException, NettyInitException, ClassNotFoundException {
          RegistryConfig registryConfig = ParseServerBean.serverToRegister(serverBean);
          BaseConfig baseConfig = new BaseConfig();
          baseConfig.setLoadBalanceRule("round");
          CommonConfig config = new CommonConfig();
          config.setConsumerConfig(this);
          config.setRegistryConfig(registryConfig);
          config.setBaseConfig(baseConfig);
          return (T) RpcProxy.invoke(ClassLoaderUtils.forName(interfaceName), config);
          }


          @Override
          public Class<?> getObjectType() {
          try {
          if (!Objects.isNull(interfaceName)) {
          return ClassLoaderUtils.forName(interfaceName);
          }
          return null;
          } catch (ClassNotFoundException e) {
          return null;
          }
          }


          @Override
          public boolean isSingleton() {
          return true;
          }
          }

          这里的ConsumerBean
          是一个FactoryBean
          ;在创建bean实例的时候,会从这里来加载;这是spring相关的东西,不深究,然后这里同样是不完善的,很多配置都没有,这里的话就是取调用之前分析的代理方法RpcProxy.invoke
          ;所以消费者在通过@Autowired
          去注入对象的时候,会放在三级缓存里面,然后往上传递,这里就生成了动态代理类,下次调用就会就走之前分析的逻辑,走handler里面处理;这里也比较简单;

          配置扫描

          这里涉及到了xsd的东西,也就是xml命名空间定义的问题,比较有意思,因为之前我没接触过;首先我们前面说了,ServerConfig
          这些类,其实就是一些配置定义,然后各个业务bean继承,然后注入到spring中,这里是以xml形式注册的,所以需要有对应的xsd文件;

          先看看spring里面的注入形式:

            public class RpcNamespaceXmlHandler extends NamespaceHandlerSupport {


            **
            * 命名空间初始化
            */
            @Override
            public void init() {
            registerBeanDefinitionParser("server", new RpcXmlBeanDefinitionParser(ServerBean.class));
            registerBeanDefinitionParser("provider", new RpcXmlBeanDefinitionParser(ProviderBean.class));
            registerBeanDefinitionParser("consumer", new RpcXmlBeanDefinitionParser(ConsumerBean.class));
            }
            }

            这里就是继承NamespaceHandlerSupport
            类,但是这里有个问题,好想不能保证这几个bean的加载顺序,目前也不知道怎么处理,所以前面就是会在ProviderBean、ConsumerBean里面注入ServerBean,通过这样来保证顺序;

            这里写好了处理器,然后就是讲处理器加载到spring中,我们需要在resources
            资源文件下面新建文件srping.handlers
            :

              http\://rpc.simple.com/schema/rpc=com.simple.rpc.spring.RpcNamespaceXmlHandler

              然后就是加载定义的xsd文件内容,也就是配置schema

              spring.schemas

                http\://rpc.simple.com/schema/rpc/rpc.xsd=META-INF/rpc.xsd

                然后这里就会去加载rpc.xsd这个文件里面的内容(粘贴部分代码):

                  <xsd:element name="server">
                  <xsd:element name="provider">
                  <xsd:element name="consumer">


                  现在就是配置都加载到了,就是解析配置;我们只需要定义一个paser,RpcXmlBeanDefinitionParser

                    public class RpcXmlBeanDefinitionParser implements BeanDefinitionParser {
                    private final Class<?> beanClass;
                    RpcXmlBeanDefinitionParser(Class<?> beanClass) {
                    this.beanClass = beanClass;
                    }
                    @Override
                    public BeanDefinition parse(Element element, ParserContext parserContext) {
                    RootBeanDefinition beanDefinition = new RootBeanDefinition();
                    beanDefinition.setBeanClass(beanClass);
                    beanDefinition.setLazyInit(false);
                    String beanName = element.getAttribute("id");
                    parserContext.getRegistry().registerBeanDefinition(beanName, beanDefinition);
                    for (Method method : beanClass.getMethods()) {
                    if (!isProperty(method, beanClass)) {continue;}
                    String name = method.getName();
                    String methodName = name.substring(3, 4).toLowerCase() + name.substring(4);
                    String value = element.getAttribute(methodName);
                    beanDefinition.getPropertyValues().addPropertyValue(methodName, value);
                    }
                    return beanDefinition;
                    }


                    private boolean isProperty(Method method, Class beanClass) {
                    判断是否是getter、setter方法
                    }
                    }

                    这里就是获取对应配置类的getter、setter方法,然后对其属性赋值;然后配置信息就被加载到了;

                    整合spring大概就是这个原理,然后使用这里就不说了;有专门介绍大大小小功能的文章;

                    整合Springboot

                    相较于spring的整合,Springboot整合相对来说也比较简单,只是加了很多注解信息,使用注解的方法来注入服务和引用;先看看注解信息;

                    SimpleRpcService:
                    这个表明一个类是RPC提供的类,然后有个alias
                    方法用来和接口的权限定名+别名来做RPC服务中的唯一值的,一般就是和beanName是同一个值;
                    SimpleRpcReference
                    :这个注解就相当于我们用的@Autowired
                    ,标记这是一个RPC的调用,里面是设计了当前这个rpc接口的负载均衡策略、版本号等,但是后面没去实现;
                    SimpleRpcScan
                    :这个是用来扫描对应包下面的所有的RPC提供的接口,会配合Scanner
                    类来进行包扫描和类加载;

                    项目目录

                      .
                      └── springboot
                      ├── ApplicationClosedEventListener.java
                      ├── ServerInitBeanPostProcessor.java
                      ├── ServiceBeanPostProcessor.java
                      ├── annotaton
                      │ └── SimpleRpcScan.java
                      ├── config
                      │ ├── BootBaseConfig.java
                      │ └── BootRegisterConfig.java
                      └── scanner
                      ├── SimpleRpcScanner.java
                      └── SimpleRpcScannerRegistrar.java


                      类扫描原理

                      SimpleRpcScanner

                        public class SimpleRpcScanner extends ClassPathBeanDefinitionScanner {


                        @SafeVarargs
                        public SimpleRpcScanner(BeanDefinitionRegistry registry, Class<? extends Annotation>... annotationTypes) {
                        // 注册bean信息
                        super(registry);
                        // 获取包含对应注解的类
                        for (Class<? extends Annotation> annotationType : annotationTypes) {
                        super.addIncludeFilter(new AnnotationTypeFilter(annotationType));
                        }
                        }
                        }

                        这里就是将使用了指定注解的类加载到过滤器中;

                        然后配合注册处理器使用,将扫描到的bean信息注入到spring容器中:

                          public class SimpleRpcScannerRegistrar implements ImportBeanDefinitionRegistrar, ResourceLoaderAware {


                          /**
                          * 服务扫描的基础包,是 @RpcScan 的哪个属性
                          */
                          private static final String SERVER_SCANNER_BASE_PACKAGE_FIELD = "basePackages";


                          /**
                          * 默认基础包
                          */
                          private static final String[] DEFAULT_SCANNER_BASE_PACKAGES = {"com.simple.rpc"};


                          private ResourceLoader resourceLoader;


                          @Override
                          public void setResourceLoader(ResourceLoader resourceLoader) {
                          this.resourceLoader = resourceLoader;
                          }


                          /**
                          * 注册bean信息,丢到spring容器中
                          *
                          * @param importingClassMetadata
                          * @param registry
                          */
                          @Override
                          public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
                          //扫描注解
                          Map<String, Object> annotationAttributes = importingClassMetadata.getAnnotationAttributes(SimpleRpcScan.class.getName());
                          SimpleRpcScanner serviceScanner = new SimpleRpcScanner(registry, SimpleRpcService.class);
                          if (resourceLoader != null) {
                          serviceScanner.setResourceLoader(resourceLoader);
                          }
                          String[] serviceBasePackages = (String[]) annotationAttributes.get(SERVER_SCANNER_BASE_PACKAGE_FIELD);
                          if (serviceBasePackages.length < 1) {
                          serviceBasePackages = DEFAULT_SCANNER_BASE_PACKAGES;
                          }
                          int serviceCount = serviceScanner.scan(serviceBasePackages);
                          SimpleRpcLog.info(StrUtil.format("serviceScanner. packages={}, count={}", serviceBasePackages, serviceCount));
                          }
                          }

                          这里的逻辑就是,先是拿到所有的@SimpleRpcScan
                          注解信息,然后在构建注解扫描器:

                          new SimpleRpcScanner(registry, SimpleRpcService.class);
                          然后获取所有的basePackages
                          路径,配合serviceScanner.scan(serviceBasePackages)
                          将所有符合的类都注入到spring中;当然这里只是元数据信息注册,并没有实例化对象;

                          服务初始化和注册

                          先看看初始化,没想到放在那个节点合适;所以就继承了BeanPostProcessor
                          ,之前的想法是想放在SpringBoot的启动时机,但是需要使用到配置类的;我这里没去验证启动时间和配置类加载的顺序,所以就使用现在这套方案;看看代码:

                            @Override
                            public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
                            if (initFlag) {
                            init(bootRegisterConfig);
                            initFlag = false;
                            }
                            return bean;
                            }


                            public void init(BootRegisterConfig bootRegisterConfig) throws BeansException {
                            SimpleRpcUrl simpleRpcUrl = SimpleRpcUrl.toSimpleRpcUrl(bootRegisterConfig);
                            // 保存注册中心信息
                            RegisterInfoCache.save(bootRegisterConfig);
                            //启动注册中心
                            RegisterCenterFactory.create(simpleRpcUrl.getType()).init(simpleRpcUrl);
                            SimpleRpcLog.info("注册中心初始化:{}", bootRegisterConfig.getAddress());
                            //初始化服务端
                            RpcServerSocket serverSocket = new RpcServerSocket(buildRequest(bootBaseConfig));
                            executorService.submit(serverSocket);
                            while (!serverSocket.isActiveSocketServer()) {
                            try {
                            Thread.sleep(500);
                            } catch (InterruptedException ignore) {
                            }
                            }
                            SimpleRpcLog.info("初始化生产端服务完成 {} {}", LocalAddressInfo.LOCAL_HOST, LocalAddressInfo.PORT);
                            }


                            这里的话会去判断是否已经记初始化过了;

                            初始化跟spring的差不多,也是初始化注册中心和初始化netty服务端;

                            服务启动就这些东西,然后就是服务注册操作了,同样是ServiceBeanPostProcessor
                            切入,这里跟上面初始化操作是根据Ordered
                            来实现加载顺序的;先看服务注册:

                              public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
                              // 判断是否是被@SimpleRpcService注解的类
                              SimpleRpcService rpcService = bean.getClass().getAnnotation(SimpleRpcService.class);
                              SimpleRpcUrl simpleRpcUrl = SimpleRpcUrl.toSimpleRpcUrl(bootRegisterConfig);
                              // rpc 服务需要发布到注册中心
                              if (rpcService != null) {
                              RegisterCenter registerCenter = RegisterCenterFactory.create(simpleRpcUrl.getType());
                              Request request = new Request();
                              String applicationName = ApplicationCache.APPLICATION_NAME;
                              request.setApplicationName(applicationName);
                              request.setLoadBalanceRule(bootBaseConfig.getLoadBalanceRule());
                              request.setSerializer(bootBaseConfig.getSerializer());
                              request.setCompressor(bootBaseConfig.getCompressor());
                              request.setRegister(bootBaseConfig.getRegister());
                              request.setWeights(bootBaseConfig.getWeights());
                              request.setHost(LocalAddressInfo.LOCAL_HOST);
                              request.setPort(LocalAddressInfo.PORT);
                              request.setHealth(HealthStatus.IS_HEALTH.getCode());
                              Class<?>[] interfaces = bean.getClass().getInterfaces();
                              if (!CollectionUtils.isEmpty(Arrays.asList(interfaces))) {
                              for (Class<?> anInterface : interfaces) {
                              String alias = getBeanName(anInterface.getCanonicalName());
                              request.setAlias(StrUtil.isBlank(rpcService.alias()) ? alias : rpcService.alias());
                              request.setBeanName(alias);
                              request.setInterfaceName(anInterface.getCanonicalName());
                              String registerKey = registerCenter.register(Request.request2Register(request));
                              // 将对应的bean存入到缓存之中
                              SimpleRpcServiceCache.addService(registerKey, bean);
                              }
                              }
                              }
                              return bean;
                              }

                              拿到注解信息,然后拿到这个bean 可能的所有实现的接口,然后拿到对应的接口的权限定名+ alias作为key存入到注册中心,然后将该baan存到缓存之中,在netty-server的handler中会使用到;这里已经把服务元信息注册到注册中心了,然后bean也注入到缓存中了;spring中也有了;这里就是注册;

                              然后就是服务的使用了,同样在这个类里面,放在了另外一个方法里面:

                                public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
                                Field[] fields = bean.getClass().getDeclaredFields();
                                for (Field field : fields) {
                                SimpleRpcReference rpcReference = field.getAnnotation(SimpleRpcReference.class);
                                if (rpcReference != null) {
                                // 生成代理对象
                                Object proxy = null;
                                try {
                                ConsumerConfig consumerConfig = buildConsumerConfig(field, rpcReference);
                                proxy = RpcProxy.invoke(ClassLoaderUtils.forName(consumerConfig.getInterfaceName()), buildCommonConfig(consumerConfig));
                                } catch (ClassNotFoundException e) {
                                e.printStackTrace();
                                }
                                field.setAccessible(true);
                                try {
                                // 设置字段
                                field.set(bean, proxy);
                                } catch (IllegalAccessException e) {
                                SimpleRpcLog.error("field.set error. bean={}, field={}", bean.getClass(), field.getName(), e);
                                }
                                }
                                }
                                return bean;
                                }

                                这里就是去获取对象里面的字段,一般我们引入类的方式就是通过AService aService
                                这种方式注入,然后这里的话就是先为该类生成对应的代理类field.set(bean, proxy);
                                然后实际调用延迟到handler中去处理,这里就是服务发现,也比较简单;

                                整合就是这些了!!


                                文章转载自爱搞技术的吴同学,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                评论