暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

「DUBBO系列」并发控制详解

JAVA前线 2020-06-07
376

IT徐胖子原创本文未授权请勿转载


1 文章概述

生产者和消费者提供了并发控制配置,通过并发控制配置项可以实现限流功能,从而有效进行系统保护。本文我们介绍生产者和消费者并发控制怎样配置并且在源码层面分析并发控制实现原理。


2 生产者

2.1 配置方式

HelloService服务每个方法在每个生产节点执行并发数不超过100

    <beans>
    <dubbo:registry address="zookeeper://127.0.0.1:2181" >
    <dubbo:protocol name="dubbo" port="8888" >
    <dubbo:service executes="100" interface="com.itxpz.dubbo.demo.provider.HelloService" ref="helloService" >
    </beans>


    2.2 源码分析

    ExecuteLimitFilter过滤器是生产者并发控制核心,我们分析构建过滤器链路源码

      public class ProtocolFilterWrapper implements Protocol {
      private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
      Invoker<T> last = invoker;
      List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
      if (!filters.isEmpty()) {
      for (int i = filters.size() - 1; i >= 0; i--) {
      final Filter filter = filters.get(i);
      final Invoker<T> next = last;


      // 构造一个简化Invoker
      last = new Invoker<T>() {


      @Override
      public Class<T> getInterface() {
      return invoker.getInterface();
      }


      @Override
      public URL getUrl() {
      return invoker.getUrl();
      }


      @Override
      public boolean isAvailable() {
      return invoker.isAvailable();
      }


      @Override
      public Result invoke(Invocation invocation) throws RpcException {


      // 构造过滤器链路
      Result result = filter.invoke(next, invocation);
      if (result instanceof AsyncRpcResult) {
      AsyncRpcResult asyncResult = (AsyncRpcResult) result;
      asyncResult.thenApplyWithContext(r -> filter.onResponse(r, invoker, invocation));
      return asyncResult;
      } else {
      return filter.onResponse(result, invoker, invocation);
      }
      }


      @Override
      public void destroy() {
      invoker.destroy();
      }


      @Override
      public String toString() {
      return invoker.toString();
      }
      };
      }
      }
      return last;
      }




      @Override
      public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
      // RegistryProtocol不构造过滤器链
      if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
      return protocol.export(invoker);
      }
      // EchoFilter > ClassloaderFilter > GenericFilter > ContextFilter > ExecuteLimitFilter >
      // TraceFilter > TimeoutFilter > MonitorFilter > ExceptionFilter > AbstractProxyInvoker
      Invoker<T> invokerChain = buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER);
      return protocol.export(invokerChain);
      }
      }

      ExecuteLimitFilter源码分析需要注意如果超出最大并发数直接抛出异常

        @Activate(group = Constants.PROVIDER, value = Constants.EXECUTES_KEY)
        public class ExecuteLimitFilter implements Filter {


        @Override
        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();


        // 方法名
        String methodName = invocation.getMethodName();


        // 最大并发数
        int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);


        // 超出最大并发数抛出异常
        if (!RpcStatus.beginCount(url, methodName, max)) {
        throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " +
        url + ", cause: The service using threads greater than <dubbo:service executes='" + max + "'/> limited.");
        }
        long begin = System.currentTimeMillis();
        boolean isSuccess = true;
        try {
        // 执行下一个链路节点
        return invoker.invoke(invocation);
        } catch (Throwable t) {
        isSuccess = false;
        if (t instanceof RuntimeException) {
        throw (RuntimeException) t;
        } else {
        throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
        }
        } finally {
        // 增加当前并发数
        RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
        }
        }


        3 消费者

        3.1 配置方式

        HelloService接口每个方法在每个消费节点执行并发数不超过100

          <beans>
          <dubbo:application name="xpz-consumer" >
          <dubbo:registry address="zookeeper://127.0.0.1:2181" >
            <dubbo:reference actives="100" id="helloService" interface="com.itxpz.dubbo.demo.provider.HelloService" />
          </beans>


          3.2 源码分析

          ActiveLimitFilter过滤器是消费者并发控制核心,我们分析构建过滤器链路源码

            public class ProtocolFilterWrapper implements Protocol {
            private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {


            // invoker = DubboInvoker
            Invoker<T> last = invoker;


            // 查询符合条件过滤器列表
            List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
            if (!filters.isEmpty()) {
            for (int i = filters.size() - 1; i >= 0; i--) {
            final Filter filter = filters.get(i);
            final Invoker<T> next = last;


            // 构造一个简化Invoker
            last = new Invoker<T>() {
            @Override
            public Class<T> getInterface() {
            return invoker.getInterface();
            }


            @Override
            public URL getUrl() {
            return invoker.getUrl();
            }


            @Override
            public boolean isAvailable() {
            return invoker.isAvailable();
            }


            @Override
            public Result invoke(Invocation invocation) throws RpcException {
            // 构造过滤器链路
            Result result = filter.invoke(next, invocation);
            if (result instanceof AsyncRpcResult) {
            AsyncRpcResult asyncResult = (AsyncRpcResult) result;
            asyncResult.thenApplyWithContext(r -> filter.onResponse(r, invoker, invocation));
            return asyncResult;
            } else {
            return filter.onResponse(result, invoker, invocation);
            }
            }


            @Override
            public void destroy() {
            invoker.destroy();
            }


            @Override
            public String toString() {
            return invoker.toString();
            }
            };
            }
            }
            return last;
            }


            @Override
            public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            // RegistryProtocol不构造过滤器链路
            if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            return protocol.refer(type, url);
            }
            Invoker<T> invoker = protocol.refer(type, url);


            // ConsumerContextFilter -> ActiveLimitFilter -> FutureFilter -> MonitorFilter -> DubboInvoker
            return buildInvokerChain(invoker, Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
            }
            }

            ActiveLimitFilter源码分析如果超出最大并发数释放锁等待被唤醒,被唤醒后检查方法是否超时,如果超时则抛出异常

              @Activate(group = Constants.CONSUMER, value = Constants.ACTIVES_KEY)
              public class ActiveLimitFilter implements Filter {


              @Override
              public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
              URL url = invoker.getUrl();


              // 方法名
              String methodName = invocation.getMethodName();


              // 最大并发数
              int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);


              // 当前并发数
              RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());


              // 超出并发数
              if (!RpcStatus.beginCount(url, methodName, max)) {
              long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
              long start = System.currentTimeMillis();
              long remain = timeout;
              synchronized (count) {


              // 循环判断是否超出并发数
              while (!RpcStatus.beginCount(url, methodName, max)) {
              try {
              // 超出并发数放弃锁等待被唤醒
              count.wait(remain);
              } catch (InterruptedException e) {


              }
              // 等待时间超过方法超时时间抛出异常
              long elapsed = System.currentTimeMillis() - start;
              remain = timeout - elapsed;
              if (remain <= 0) {
              throw new RpcException("Waiting concurrent invoke timeout in client-side for service: "
              + invoker.getInterface().getName() + ", method: "
              + invocation.getMethodName() + ", elapsed: " + elapsed
              + ", timeout: " + timeout + ". concurrent invokes: " + count.getActive()
              + ". max concurrent invoke limit: " + max);
              }
              }
              }
              }
              boolean isSuccess = true;
              long begin = System.currentTimeMillis();
              try {
              // 执行下一个链路节点
              return invoker.invoke(invocation);
              } catch (RuntimeException t) {
              isSuccess = false;
              throw t;
              } finally {
              // 增加当前并发数
              RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
              if (max > 0) {
              synchronized (count) {
              // 唤醒等待线程
              count.notifyAll();
              }
              }
              }
              }
              }


              4 文章总结

              本文我们介绍了生产者和消费者并发控制配置,并且在源码层面分析了并发控制实现原理。我们知道使用DUBBO即使不依赖第三方限流框架也可以实现限流功能。


              长按二维码关注更多精彩文章

              文章转载自JAVA前线,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

              评论