IT徐胖子原创本文未授权请勿转载
1 文章概述
生产者和消费者提供了并发控制配置,通过并发控制配置项可以实现限流功能,从而有效进行系统保护。本文我们介绍生产者和消费者并发控制怎样配置并且在源码层面分析并发控制实现原理。
2 生产者
2.1 配置方式
HelloService服务每个方法在每个生产节点执行并发数不超过100
<beans><dubbo:registry address="zookeeper://127.0.0.1:2181" ><dubbo:protocol name="dubbo" port="8888" ><dubbo:service executes="100" interface="com.itxpz.dubbo.demo.provider.HelloService" ref="helloService" ></beans>
2.2 源码分析
ExecuteLimitFilter过滤器是生产者并发控制核心,我们分析构建过滤器链路源码
public class ProtocolFilterWrapper implements Protocol {private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {Invoker<T> last = invoker;List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);if (!filters.isEmpty()) {for (int i = filters.size() - 1; i >= 0; i--) {final Filter filter = filters.get(i);final Invoker<T> next = last;// 构造一个简化Invokerlast = new Invoker<T>() {@Overridepublic Class<T> getInterface() {return invoker.getInterface();}@Overridepublic URL getUrl() {return invoker.getUrl();}@Overridepublic boolean isAvailable() {return invoker.isAvailable();}@Overridepublic Result invoke(Invocation invocation) throws RpcException {// 构造过滤器链路Result result = filter.invoke(next, invocation);if (result instanceof AsyncRpcResult) {AsyncRpcResult asyncResult = (AsyncRpcResult) result;asyncResult.thenApplyWithContext(r -> filter.onResponse(r, invoker, invocation));return asyncResult;} else {return filter.onResponse(result, invoker, invocation);}}@Overridepublic void destroy() {invoker.destroy();}@Overridepublic String toString() {return invoker.toString();}};}}return last;}@Overridepublic <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {// RegistryProtocol不构造过滤器链if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {return protocol.export(invoker);}// EchoFilter > ClassloaderFilter > GenericFilter > ContextFilter > ExecuteLimitFilter >// TraceFilter > TimeoutFilter > MonitorFilter > ExceptionFilter > AbstractProxyInvokerInvoker<T> invokerChain = buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER);return protocol.export(invokerChain);}}
ExecuteLimitFilter源码分析需要注意如果超出最大并发数直接抛出异常
@Activate(group = Constants.PROVIDER, value = Constants.EXECUTES_KEY)public class ExecuteLimitFilter implements Filter {@Overridepublic Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {URL url = invoker.getUrl();// 方法名String methodName = invocation.getMethodName();// 最大并发数int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);// 超出最大并发数抛出异常if (!RpcStatus.beginCount(url, methodName, max)) {throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " +url + ", cause: The service using threads greater than <dubbo:service executes='" + max + "'/> limited.");}long begin = System.currentTimeMillis();boolean isSuccess = true;try {// 执行下一个链路节点return invoker.invoke(invocation);} catch (Throwable t) {isSuccess = false;if (t instanceof RuntimeException) {throw (RuntimeException) t;} else {throw new RpcException("unexpected exception when ExecuteLimitFilter", t);}} finally {// 增加当前并发数RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);}}
3 消费者
3.1 配置方式
HelloService接口每个方法在每个消费节点执行并发数不超过100
<beans><dubbo:application name="xpz-consumer" ><dubbo:registry address="zookeeper://127.0.0.1:2181" ><dubbo:reference actives="100" id="helloService" interface="com.itxpz.dubbo.demo.provider.HelloService" /></beans>
3.2 源码分析
ActiveLimitFilter过滤器是消费者并发控制核心,我们分析构建过滤器链路源码
public class ProtocolFilterWrapper implements Protocol {private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {// invoker = DubboInvokerInvoker<T> last = invoker;// 查询符合条件过滤器列表List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);if (!filters.isEmpty()) {for (int i = filters.size() - 1; i >= 0; i--) {final Filter filter = filters.get(i);final Invoker<T> next = last;// 构造一个简化Invokerlast = new Invoker<T>() {@Overridepublic Class<T> getInterface() {return invoker.getInterface();}@Overridepublic URL getUrl() {return invoker.getUrl();}@Overridepublic boolean isAvailable() {return invoker.isAvailable();}@Overridepublic Result invoke(Invocation invocation) throws RpcException {// 构造过滤器链路Result result = filter.invoke(next, invocation);if (result instanceof AsyncRpcResult) {AsyncRpcResult asyncResult = (AsyncRpcResult) result;asyncResult.thenApplyWithContext(r -> filter.onResponse(r, invoker, invocation));return asyncResult;} else {return filter.onResponse(result, invoker, invocation);}}@Overridepublic void destroy() {invoker.destroy();}@Overridepublic String toString() {return invoker.toString();}};}}return last;}@Overridepublic <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {// RegistryProtocol不构造过滤器链路if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {return protocol.refer(type, url);}Invoker<T> invoker = protocol.refer(type, url);// ConsumerContextFilter -> ActiveLimitFilter -> FutureFilter -> MonitorFilter -> DubboInvokerreturn buildInvokerChain(invoker, Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);}}
ActiveLimitFilter源码分析如果超出最大并发数释放锁等待被唤醒,被唤醒后检查方法是否超时,如果超时则抛出异常
@Activate(group = Constants.CONSUMER, value = Constants.ACTIVES_KEY)public class ActiveLimitFilter implements Filter {@Overridepublic Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {URL url = invoker.getUrl();// 方法名String methodName = invocation.getMethodName();// 最大并发数int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);// 当前并发数RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());// 超出并发数if (!RpcStatus.beginCount(url, methodName, max)) {long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);long start = System.currentTimeMillis();long remain = timeout;synchronized (count) {// 循环判断是否超出并发数while (!RpcStatus.beginCount(url, methodName, max)) {try {// 超出并发数放弃锁等待被唤醒count.wait(remain);} catch (InterruptedException e) {}// 等待时间超过方法超时时间抛出异常long elapsed = System.currentTimeMillis() - start;remain = timeout - elapsed;if (remain <= 0) {throw new RpcException("Waiting concurrent invoke timeout in client-side for service: "+ invoker.getInterface().getName() + ", method: "+ invocation.getMethodName() + ", elapsed: " + elapsed+ ", timeout: " + timeout + ". concurrent invokes: " + count.getActive()+ ". max concurrent invoke limit: " + max);}}}}boolean isSuccess = true;long begin = System.currentTimeMillis();try {// 执行下一个链路节点return invoker.invoke(invocation);} catch (RuntimeException t) {isSuccess = false;throw t;} finally {// 增加当前并发数RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);if (max > 0) {synchronized (count) {// 唤醒等待线程count.notifyAll();}}}}}
4 文章总结
本文我们介绍了生产者和消费者并发控制配置,并且在源码层面分析了并发控制实现原理。我们知道使用DUBBO即使不依赖第三方限流框架也可以实现限流功能。
长按二维码关注更多精彩文章
文章转载自JAVA前线,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




