IT徐胖子原创本文未授权请勿转载
1 文章概述
DUBBO有很多地方可以配置超时时间,可以配置在消费者,可以配置在生产者,可以配置为方法级别,可以配置为接口级别,还可以配置为全局级别,根据DUBBO官方文档介绍这些配置优先级分为两个维度:
第一优先级:方法级 > 接口级 > 全局级第二优先级:消费者 > 生产者
本文从源码层面对超时机制进行分析,我们首先分析优先级如何生效,然后再分析超时机制在消费者和生产者分别如何实现。
2 配置优先级
2.1 消费者 > 生产者
配置生产者接口级别超时时间888毫秒
<beans><dubbo:registry address="zookeeper://127.0.0.1:2181" ><dubbo:protocol name="dubbo" port="20880" ><dubbo:service timeout="888" interface="com.itxpz.dubbo.demo.provider.HelloService" ref="helloService" ></beans>
配置消费者接口级别超时时间999毫秒
<beans><dubbo:application name="xpz-consumer" ><dubbo:registry address="zookeeper://127.0.0.1:2181" ><dubbo:reference timeout="999" id="helloService" interface="com.itxpz.dubbo.demo.provider.HelloService" ></beans>
生产者首先注册服务信息至注册中心,消费者从注册中心订阅服务信息,在获取到生产者服务信息后,会将这些配置与消费者配置进行融合,核心在消费者订阅信息后会将服务信息转化为Invokers这一段代码
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {private Map<String, Invoker<T>> toInvokers(List<URL> urls) {Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();if (urls == null || urls.isEmpty()) {return newUrlInvokerMap;}for (URL providerUrl : urls) {// providerUrl是从注册中心订阅的生产者配置// providerUrl=dubbo://x.x.x.x:20880/com.itxpz.dubbo.demo.provider.HelloService?anyhost=true&application=xpz-provider&dubbo=2.0.2&generic=false&interface=com.itxpz.dubbo.demo.provider.HelloService&methods=sayHello&pid=16736&release=2.7.0&side=provider&timeout=888// mergeUrl方法进行多维度参数融合// 本文只分析消费者和生产者参数融合URL url = mergeUrl(providerUrl);}}}
分析消费者和生产者参数融合代码
public class ClusterUtils {public static URL mergeUrl(URL remoteUrl, Map<String, String> localMap) {// 消费者参数localMap = {side=consumer, register.ip=x.x.x.x, methods=sayHello, release=2.7.0, qos.port=55555, dubbo=2.0.2, pid=16904, interface=com.itxpz.dubbo.demo.provider.HelloService, qos.enable=true, timeout=999, application=xpz-consumer, qos.accept.foreign.ip=false, timestamp=123}// 生产者参数remoteMap = {side=provider, methods=sayHello, release=2.7.0, dubbo=2.0.2, pid=16736, interface=com.itxpz.dubbo.demo.provider.HelloService, generic=false, timeout=888, application=xpz-provider, anyhost=true, timestamp=123}Map<String, String> remoteMap = remoteUrl.getParameters();Map<String, String> map = new HashMap<String, String>();// 消费者配置不为空则全部赋值至结果对象if (localMap != null && localMap.size() > 0) {String remoteGroup = map.get(Constants.GROUP_KEY);map.put(Constants.GROUP_KEY, remoteGroup);map.putAll(localMap);}// 生产者配置不为空则设置一些信息if (remoteMap != null && remoteMap.size() > 0) {// 省略代码}// 超时时间已经从888毫秒变为999毫秒// dubbo://x.x.x.x:20880/com.itxpz.dubbo.demo.provider.HelloService?anyhost=true&application=xpz-consumer&dubbo=2.0.2&generic=false&group=&interface=com.itxpz.dubbo.demo.provider.HelloService&methods=sayHello&pid=16284&qos.accept.foreign.ip=false&qos.enable=true&qos.port=55555&release=2.7.0&remote.application=xpz-provider&side=consumer&timeout=999URL result = remoteUrl.clearParameters().addParameters(map);return result;}}
2.2 方法级 > 接口级
配置消费者接口级别超时时间999毫秒
<beans><dubbo:application name="xpz-consumer" ><dubbo:registry address="zookeeper://127.0.0.1:2181" ><dubbo:reference timeout="999" id="helloService" interface="com.itxpz.dubbo.demo.provider.HelloService" ></beans>
配置生产者方法级别超时时间1111毫秒
<beans><dubbo:registry address="zookeeper://127.0.0.1:2181" ><dubbo:protocol name="dubbo" port="20880" ><dubbo:service interface="com.itxpz.dubbo.demo.provider.HelloService" ref="helloService"><dubbo:method name="sayHello" timeout="1111" ></dubbo:service></beans>
首先观察经过参数融合后URL
public class ClusterUtils {public static URL mergeUrl(URL remoteUrl, Map<String, String> localMap) {// 消费者参数localMap = {side=consumer, register.ip=x.x.x.x, methods=sayHello, release=2.7.0, qos.port=55555, dubbo=2.0.2, pid=15436, interface=com.itxpz.dubbo.demo.provider.HelloService, qos.enable=true, timeout=999, application=xpz-consumer, qos.accept.foreign.ip=false, timestamp=123}// 生产者参数remoteMap = {side=provider, methods=sayHello, release=2.7.0 dubbo=2.0.2, pid=16260,interface = com.itxpz.dubbo.demo.provider.HelloService, sayHello.timeout = 1111, generic = false, application = xpz - provider, anyhost = true, timestamp = 123}Map<String, String> remoteMap = remoteUrl.getParameters();Map<String, String> map = new HashMap<String, String>();// 消费者配置不为空则全部赋值至结果对象if (localMap != null && localMap.size() > 0) {String remoteGroup = map.get(Constants.GROUP_KEY);map.put(Constants.GROUP_KEY, remoteGroup);map.putAll(localMap);}// 生产者配置不为空则设置一些信息if (remoteMap != null && remoteMap.size() > 0) {// 省略代码}// 超时存在两个配置sayHello.timeout=1111、timeout=999// dubbo://x.x.x.x:20880/com.itxpz.dubbo.demo.provider.HelloService?anyhost=true&application=xpz-consumer&dubbo=2.0.2&generic=false&group=&interface=com.itxpz.dubbo.demo.provider.HelloService&methods=sayHello&pid=5456&qos.accept.foreign.ip=false&qos.enable=true&qos.port=55555&release=2.7.0&remote.application=xpz-provider&sayHello.timeout=1111&side=consumer&timeout=999URL result = remoteUrl.clearParameters().addParameters(map);return result;}}
我们看到关于超时存在两个配置,优先级在消费者发起远程调用时体现
public class DubboInvoker<T> extends AbstractInvoker<T> {@Overrideprotected Result doInvoke(final Invocation invocation) throws Throwable {RpcInvocation inv = (RpcInvocation) invocation;final String methodName = RpcUtils.getMethodName(invocation);inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());inv.setAttachment(Constants.VERSION_KEY, version);ExchangeClient currentClient;if (clients.length == 1) {currentClient = clients[0];} else {currentClient = clients[index.getAndIncrement() % clients.length];}try {boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);boolean isAsyncFuture = RpcUtils.isReturnTypeFuture(inv);boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);// 获取超时时间方法体现优先级// getUrl() = dubbo://x.x.x.x:20880/com.itxpz.dubbo.demo.provider.HelloService?anyhost=true&application=xpz-consumer&dubbo=2.0.2&generic=false&group=&interface=com.itxpz.dubbo.demo.provider.HelloService&methods=sayHello&pid=5456&qos.accept.foreign.ip=false&qos.enable=true&qos.port=55555&release=2.7.0&remote.application=xpz-provider&sayHello.timeout=1111&side=consumer&timeout=999int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);if (isOneway) {boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);currentClient.send(inv, isSent);RpcContext.getContext().setFuture(null);return new RpcResult();} else if (isAsync) {ResponseFuture future = currentClient.request(inv, timeout);FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);RpcContext.getContext().setFuture(futureAdapter);Result result;if (isAsyncFuture) {result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);} else {result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);}return result;} else {RpcContext.getContext().setFuture(null);// currentClient.request方法发起远程调用// get方法进行超时判断return (Result) currentClient.request(inv, timeout).get();}} catch (TimeoutException e) {throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);} catch (RemotingException e) {throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);}}}public int getMethodParameter(String method, String key, int defaultValue) {// 获取sayHello.timeout属性不为空则直接返回// sayHello.timeout正是由方法级别生成优先级最高String methodKey = method + "." + key;Number n = getNumbers().get(methodKey);if (n != null) {return n.intValue();}// 获取timeout属性如果为空则返回默认值String value = getMethodParameter(method, key);if (StringUtils.isEmpty(value)) {return defaultValue;}int i = Integer.parseInt(value);getNumbers().put(methodKey, i);return i;}
3 消费者超时机制
public class DubboInvoker<T> extends AbstractInvoker<T> {@Overrideprotected Result doInvoke(final Invocation invocation) throws Throwable {try {// get方法进行超时判断// currentClient.request方法发起远程调用return (Result) currentClient.request(inv, timeout).get();} catch (TimeoutException e) {throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);} catch (RemotingException e) {throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);}}}
DefaultFuture尝试接收响应结果,如果阻塞达到超时时间响应结果还是为空,那么消费者会抛出超时异常
public class DefaultFuture implements ResponseFuture {@Overridepublic Object get(int timeout) throws RemotingException {if (timeout <= 0) {timeout = Constants.DEFAULT_TIMEOUT;}// 如果response对象为空if (!isDone()) {long start = System.currentTimeMillis();lock.lock();try {while (!isDone()) {// 放弃锁并使当前线程等待,直到发出信号或中断它,或者达到超时时间done.await(timeout, TimeUnit.MILLISECONDS);if (isDone()) {break;}if(System.currentTimeMillis() - start > timeout) {break;}}} catch (InterruptedException e) {throw new RuntimeException(e);} finally {lock.unlock();}// 如果response对象仍然为空则抛出超时异常if (!isDone()) {throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));}}return returnFromResponse();}@Overridepublic boolean isDone() {return response != null;}private void doReceived(Response res) {lock.lock();try {// 接收到服务器响应赋值responseresponse = res;if (done != null) {// 唤醒get方法中处于等待的代码块done.signal();}} finally {lock.unlock();}if (callback != null) {invokeCallback(callback);}}}
4 生产者超时机制
生产者超时机制体现在TimeoutFilter过滤器,需要注意生产者超时只记录一条日志,流程继续进行不抛出异常
@Activate(group = Constants.PROVIDER)public class TimeoutFilter implements Filter {private static final Logger logger = LoggerFactory.getLogger(TimeoutFilter.class);@Overridepublic Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {long start = System.currentTimeMillis();Result result = invoker.invoke(invocation);long elapsed = System.currentTimeMillis() - start;// 生产者配置int timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), "timeout", Integer.MAX_VALUE);// 如果超时只记录一条日志流程继续进行if (invoker.getUrl() != null && elapsed > timeout ) {if (logger.isWarnEnabled()) {logger.warn("invoke time out method: " + invocation.getMethodName() + " arguments: " + Arrays.toString(invocation.getArguments()) + " , url is " + invoker.getUrl() + ", invoke elapsed " + elapsed + " ms.");}}return result;}}
5 合理设置超时时间
我们设想这样一种场景:业务系统调用订单中心服务查询订单信息,由于业务系统没有合理设置超时时间,用户长时间得不到响应会反复查询订单信息,所以无论上游系统还是下游系统都可能因为流量激增导致系统崩溃,这就是系统雪崩。
消费者需要了解生产者服务大概率响应时间,设置消费者超时时间略长于大概率响应时间。如果无需同步响应可以采用Failback集群容错策略或者异步调用。消费者和生产者都需要做好限流、降级、熔断策略保护系统,防止出现系统雪崩这类严重问题。
长按二维码关注更多精彩文章




