暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Dubbo RPC远程调用过程源码分析(服务提供者)

Java艺术 2021-09-08
330
关注“Java艺术”一起来充电吧!

在前面分析Dubbo注册中心层源码的文章中,我们知道,服务的导出与引入由RegistryProtocol调度完成。对于服务提供者,服务是先导出再注册到注册中心;对于服务消费者,先将自己注册到注册中心,再订阅事件,由RegistryDirectory将所有服务提供者转为Invoker。


那么,服务导出在RPC层都做了什么事情,以及服务提供端是如何处理请求、响应请求的,本篇文章内容主要为:


  • RPC层服务导出

  • 总结整个服务导出流程

  • 接收到一个请求的处理过程


RPC层Dubbo协议的服务导出


RPC层封装了PRC调用逻辑,以InvocationResult为中心,Invocation为请求消息描述消费端想调用那个接口的哪个方法,以及参数是什么,可以理解为Servlet的HttpServletRequest;Result为返回结果,可以理解为Servlet的HttpServletResponse。扩展接口为ProtocolInvokerExporter,如dubbo协议的DubboProtocol。Exporter定义服务的导出和解除导出,Invoker可以理解为Spring MVC的MethodHandler,真正处理请求的就是Invoker。


本篇文章主要以Dubbo协议为例,传输层使用Netty4(默认的),分析服务的导出过程,以及接收到请求的处理全过程。在分析注册中心层的RegistryProtocol的export方法时,有这么一行代码因为与注册中心没有多大关系而被我们选择忽略。


    【RegistryProtocol类的export方法】
    // 导出invoker,DubboProtocol
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);


    这就是我们将要分析的RPC层代码的入口,我们将从doLocalExport方法开始分析。

      private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
         String key = getCacheKey(originInvoker);
         return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
             // 包装Invoker
      Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
             // 根据SPI自适应扩展点机器,导出服务
             return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
      });
      }


      重点关注这行(Exporter<T>) protocol.export(invokerDelegate)。这里又有一个protocol,这个protocol是谁,如果服务提供者使用的是默认的dubbo协议,那么这个protocol就是DubboProtocol。假设当前服务提供者的url如下。

        dubbo://10.1.0.251:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-annotation-provider&bean.name=.....


        从url中获取到服务提供者使用的协议是dubbo,所以根据SPI自适应扩展点机制,拿到的就是DubboProtocol

          dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol


          知道protocol是DubboProtocol后,我们继续分析DubboProtocol的export方法。

             @Override
            public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
                    URL url = invoker.getUrl();
            // 导出服务,key=分组/接口名:版本号:端口号
            String key = serviceKey(url);
            DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
                    exporterMap.put(key, exporter);
                    // 打开服务
            openServer(url);
                    optimizeSerialization(url);
            return exporter;
             }


            export方法就是将Invoker转为Exporter,然后调用openServer确保底层服务打开。往下分析我们将接触到信息交换层与传输层。我们继续看openServer方法。

              private void openServer(URL url) {
              String key = url.getAddress(); // return port <= 0 ? host : host + ":" + port;
              ExchangeServer server = serverMap.get(key);
              if (server == null) {
              synchronized (this) {
              server = serverMap.get(key);
                           if (server == null) {
              serverMap.put(key, createServer(url));
                           }
                       }
                  }
              }

              openServer方法使用了双重检测确保线程安全。url.getAddress()作为key,serverMap缓存所有打开的Server(ExchangeServer),如果不存在则调用createServer方法创建并打开Server。根据Service的ip和端口号决定打开多少个Server,而一个进程只能开启一个监听端口,一个网卡只有一个IP,所以整个进程都只会打开一个Server。那这个Server到底是什么,openServer做了什么?


                private ExchangeServer createServer(URL url) {
                        url = URLBuilder.from(url)
                // 服务器关闭时发送只读事件
                                .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
                                // 加上心跳配置,默认心跳事件为1分钟
                .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
                                // 加上编码解码器配置
                .addParameter(CODEC_KEY, DubboCodec.NAME)
                                .build();
                ExchangeServer server;
                try {
                server = Exchangers.bind(url, requestHandler);
                } catch (RemotingException e) {
                throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
                        }
                return server;
                }


                为了简单,我去掉了一些无关紧要的代码。Exchangers是一个工具类,bind方法封装了信息交换层的创建逻辑,参数url依然要传递给信息交换层,因为url可能携带我们配置的exchanger参数。以及由信息交换层衔接的传输层,也需要从url中获取配置。本篇不再展开分析。最后传输层将创建一个NettyServer,在NettyServer的构造函数中开启一个Netty服务。这部分内容我将会以单独一篇文章详细解析。


                总结整个服务导出流程


                回顾往期文章,从深入理解Dubbo源码(三),Dubbo与Spring的整合之注解方式这篇文章中,我们知道,服务的导出入口在config层的ServiceConfig的onApplicationEvent方法,即在Spring初始化完成之后,开始导出服务;从Dubbo分层架构之服务注册中心层的源码分析(上)以及Dubbo分层架构之服务注册中心层的源码分析(下)这两篇文章,我们知道,服务先由注册中心层调用rpc层导出服务之后再注册到注册中心。


                从本篇文章中,我们知道,服务在RPC层的导出将根据协议创建Server,以Dubbo协议为例,最终会创建一个NettyServer。所以,整个服务导出流程如下图所示。


                服务导出总体流程图


                整个服务的导出过程都是由dubbo.URL将配置层、注册中心层、RPC层、信息交换层、传输层串起来的。当前,还有一些细节没有分析到。


                接收到一个请求的调用过程


                以 dubbo2.7.2版本源码中的demo为例,我们先屏蔽底层的信息交换层和传输层的调用过程逻辑,从rpc层开始分析,当服务端接收到一个请求时RPC层是怎么找到DemoService服务并调用的,以及响应过程。


                以Debug方式启动demo的服务提供者,再启动服务消费者,在DubboProtocolrequestHandler字段的reply方法中下一个断点,当接收到消费端的请求时,将会停在这个断点。下面分析为什么会停在这个断点。


                DubboProtocol类的requestHandler字段


                前面分析的,在DubboProtocol的createServer方法中,调用Exchangers的bind方法时传入了一个requestHandler,正是断点停在的ExchangeHandler。这个ExchangeHandler的reply方法将是服务端处理请求的RPC层入口。

                  Exchangers.bind(url, requestHandler)


                  这个requestHandler是一个匿名内部类,类型为ExchangeHandlerAdapter,实现了ExchangeHandler接口ExchangeHandler又是继承ChannelHandler的,所以这个requestHandler会被注册到Netty的pipline中,但不是直接注册到netty的,因为此ChannelHandler并非Netty的Handler,而是dubbo抽象传输层的ChannelHandler


                  每个ChannelHandler的实现类都是继承ChannelHandlerDelegate,即实现委托模式。所以每ChannelHandler中都持有前一个ChannelHandler。每New一个ChannelHandler都要传入一个ChannelHandler,所以就构成了一条链。


                  ChannelHandler链


                  在Netty接收到消息时,先调用最顶层的NettyServerHandler的channelRead方法,再到Dubbo抽象的ChannelHandler的received方法,最后一层层往下传递处理。先是解码器DecodeHandler,再到信息交换层处理器HeaderExchangeHandler。在信息交换层中改变调用方式,调用ExchangeHandler的reply(回复)方法,交给RPC层处理请求。


                    public interface ExchangeHandler extends ChannelHandlerTelnetHandler {
                        CompletableFuture<Object> reply(ExchangeChannel channel, 
                         Object request) throws RemotingException;
                    }


                    reply方法的第二个参数类型为Request。可以理解为Servlet的doPost方法接收的HttpServletRequest,包装客户端请求的全部参数信息。如请求id、版本号、请求元数据描述。关于请求元数据描述有必要解析下。



                    请求元数据描述类型为RpcInvocation(org.apache.dubbo.rpc.Invocation)。Invocation字段包括:序列化标志(决定使用哪个序列化协议,比如2就是hession2)、远程调用的方法名(sayHello)、方法参数类型(String),调用传递的参数(hello)。请求附带的参数:请求路径(dubbo协议为接口名),dubbo版本、接口名、接口版本、分组等。


                    我们继续从断点开始分析,先看reply方法的前两行代码。

                      Invocation inv = (Invocation) message;
                      Invoker<?> invoker = getInvoker(channel, inv);


                      经过上面的分析,我们知道,message的类型为Invocation,且是由消费端发送过来的。第二行代码就是将Invocation转为Invoker,因为Invoker才是调用目标方法的入口,就像Spring MVC的HandlerMethod。

                        Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
                                ........
                                // serviceKey=分组/接口名:版本号:端口号
                        String serviceKey = serviceKey(port, path, inv.getAttachments().get(VERSION_KEY), inv.getAttachments().get(GROUP_KEY));
                           DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
                        return exporter.getInvoker();
                        }


                        getInvoker方法的代码是不是很眼熟?在服务导出时根据ip、端口号、接口版本、分组拼接成一个key,并将Invoker封装成Exporter缓存到exporterMap中,现在只是从缓存中拿出来用而已。Spring MVC的HandlerMethod也是用一个Map容易缓存的,key为路径,value为HandlerMethod,没什么区别。


                        最后又从Exporter中拿到Invoker,此Invoker是什么呢?首先这是一个被层层包装的Invoker,既有代理,也有委托,我们只关心它最初是什么样的。因为不管是代理还是委托,最后都会调用到最初始的那个Invoker。那Dubbo层层封装的目的是什么?稍后分析。



                        还记得图中这几行代码吗,前两篇文章才分析的。在配置层ServiceConfig的doExportUrlsFor1Protocol方法中。而图中的圈圈ref就是DemoService的实现类DemoServiceImpl的实例。



                        回到requestHandler的reply方法,继续往下分析,我先简化下reply的代码。

                          @Override
                          public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
                          // 请求消息
                          Invocation inv = (Invocation) message;
                          // 获取Invoker
                              Invoker<?> invoker = getInvoker(channel, inv);
                          // 请求上下文设置客户端ip地址,不关心这个
                          RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                          // 调用服务,获取响应结果
                          Result result = invoker.invoke(inv);
                          return result.completionFuture().thenApply(Function.identity());
                          }

                          简化代码后好看多了,reply变得简单了,就是根据请求消息获取到Invoker,调用Invoker处理请求并获取结果,最后返回给客户端,只是返回的逻辑涉及到底层,此处不再深入分析。


                          接收到客户端的请求消息类型解包后是一个Request,而reply方法的第二个参数message类型是Invocation,这是在信息交换层HeaderExchangeHandler中转换的,Invocation是Request中获取的,即getDate方法


                          我们接着看invoker.invoke(inv)的整个调用链。回过头去跟踪下整个服务导出的链路,看看最初的Invoker是什么类型,以及都经过了多少层包装。


                          Invoker包装链


                          如图所示。Invoker首先是由代理工厂创建的,DemoServiceImpl的代理类。先是经过配置层ServiceConfig的包装,变为DelegateProviderMetaDataInvoker;图中漏了一个注册中心层包装的InvokerDelegate,不是很重要;再经过过滤器层的包装,变成Filter,至于经过多少个过滤器的包装就得看配置了。


                          因此,DubboProtocol的requestHandler的reply方法中拿到的invoker是经过过滤器层ProtocolFilterWrapper包装的。



                          分析到这,你应该知道过滤器是怎么起作用了吧。请求先是经过过滤器处理,如果过滤器都不过滤请求,会先到配置层包装的DelegateProviderMetaDataInvoker。跳过过滤器,直接在DelegateProviderMetaDataInvoker的invoker方法中下断点。



                          也没啥处理的,继续往回走,但是往回走就下不了断点了,因为是javasist动态生成的代码,所以我们要看的就是动态代理工厂给我们生成的代理类的代码。这得借助一些工具。


                          在org.apache.dubbo.common.bytecode.ClassGenerator的toClass方法中加入如下代码,将生成的类字节码输出到一个路径下,再通过反编译工具查看。当然,你要是能从乱糟糟的javassist拼接的字符串看出是驴是马,我也不说什么了。



                          使用一个反编译工具打开class文件,我用的是Luyten这个工具。



                          生成代理类之后还会包装一层代理,将调用Invoker的invoker方法转为调用动态代理类的invokerMethod方法。具体代码看AbstractInvoker这个类。

                            @Override
                            public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
                                    // 动态代理包装DemoServiceImpl
                            final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
                                    // 再给生成的动态代理类包装一层代理
                            return new AbstractProxyInvoker<T>(proxy, type, url) {
                            @Override
                            protected Object doInvoke(T proxy, String methodName,
                            Class<?>[] parameterTypes,
                            Object[] arguments) throws Throwable {
                            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
                            }
                            };
                            }



                            似乎哪里不对?Invoker要求invoke方法返回一个Result,但是javassist生成的动态代理类的invokeMethod方法返回值是一个Object类型,是什么时候转成Result的?invokeMethod是由AbstractProxyInvoker的doInvoker方法调用的,所以将Object包装为Result就是在AbstractProxyInvoker的invoker方法中实现的。



                            到这,整个方法的调用流程就都清楚了。你知道过滤器是什么时候被调用了吗?你知道熔断器为什么基于过滤器实现了吗?但要知道处理请求的线程调度,以及为何线程池满了抛出rpc远程调用异常,这些还需要继续深入信息交换层和传输层才能找到答案。


                            往期原创精选
                            Dubbo分层架构之服务注册中心层的源码分析(下)
                            Dubbo分层架构之服务注册中心层的源码分析(上)
                            Dubbo自适应随机负载均衡策略的实现
                            源码分析Dubbo负载均衡策略的权重如何动态修改
                            深入理解Dubbo源码(三),Dubbo与Spring的整合之注解方式
                            深入理解Dubbo源码(二),分析Java SPI与Dubbo SPI的实现源码
                            深入理解Dubbo源码(一),如何高效的阅读Dubbo框架源码



                            微信公众号:Java艺术
                            扫码关注最新动态,查看往期精彩


                            文章转载自Java艺术,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                            评论