暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Dubbo限流源码分析

codeImport 2020-02-18
542

Dubbo的配置丰富,功能强大,但是网上的教程多为复制粘贴,鱼龙混杂。本着不轻信,不盲从,眼见为实的态度,本系列教程将从源代码的角度分析各配置项的作用。

1.Dubbo限流该怎么设置

dubbo有多种限流方式,可以使用以下参数进行多维度的限流:

1.accepts:服务端最大可接受连接数,可以理解为可以接受的最大消费者数;2.connections:每个Reference开启的连接数;3.actives:消费端控制每个接口的最大并发数;4.executes:服务端控制每个接口的最大并发数;

2.accepts

2.1.accepts是什么?

Provider配置最大可接受连接数,这是项目级别设置。

比如一个Provider设置了accepts=2,该Provider3个消费者分别为C1,C2,C3。

假如这3个消费者的启动顺序为C1,C2,C3,则C3会无法启动,因为服务已经达到了

最大连接数限制;

2.2.accepts该如何配置? 

dubbo.provider.accepts=3

2.3.accepts源码分析

在消费者端启动时,会生成一条Netty连接,服务端此时会判断服务端接受的连接数是否已经大于accepts,如果大于,则会拒绝该连接。

    //此代码在AbstractServer中
    public void connected(Channel ch) throws RemotingException {
        if (!this.isClosing() && !this.isClosed()) {
           Collection<Channel> channels = this.getChannels();
           //判断服务端连接数
           if (this.accepts > 0 && channels.size() > this.accepts) {
               logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + this.accepts);
               ch.close();
           } else {
               super.connected(ch);
}
        } else {
           logger.warn("Close new channel " + ch + ", cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process.");
           ch.close();
}
}
   

3.connections

3.1.connectons是什么?

每个Reference开启的长连接数,默认是0,表示所有的Reference共享同一条连接;如果大于0,则单独为此Reference设置connections条长连接。

比如同一个项目有3个reference: 

@Reference(connections=3) HelloService helloService; 

@Reference TestService testService; 

@Reference FooService fooService; 

则该项目会生成4条连接,其中helloService有3条,testService与fooService共用一条

3.2.connections该如何使用

@Reference(connections=3) 

3.3.connections源码分析

在Reference获取ExchangeClient的时候,会判断Reference是否设置了connections参数,如果是则生成相应的ExchangeClient(每个ExchangeClient包含一条连接)

    //该段代码在DubboProtocol中
    //该方法会在进行refer的时候调用
    private ExchangeClient[] getClients(URL url) {
// whether to share connection
        boolean useShareConnect = false;
        int connections = url.getParameter(CONNECTIONS_KEY, 0);
        List<ReferenceCountExchangeClient> shareClients = null;
        // if not configured, connection is shared, otherwise, one connection for one service
        if (connections == 0) {
useShareConnect = true;
            //如果connections为0,则获取共享的连接,共享的连接数为1
            String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
            connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
            shareClients = getSharedClient(url, connections);
        }
        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            if (useShareConnect) {
clients[i] = shareClients.get(i);
            } else {
//如果设置了connections则新生成足够的连接
clients[i] = initClient(url);
            }
}
        return clients;
}

4.actives

4.1.actives是什么?

Consumer端每个接口的最大并发数,默认是0,如果是0则没有限制。 

4.2.actives该如何使用? 

@Reference(actives = 3)

4.3.actives源码分析 

如果actives大于0,则在Consumer端调用链会加入ActiveLimitFilter过滤器,每次调用前都会判断该接口是否超出了最大并发数,如果超过会等待timeout时间,超时会抛出异常。

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        int max = invoker.getUrl().getMethodParameter(methodName, ACTIVES_KEY, 0);
        final RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
        if (!RpcStatus.beginCount(url, methodName, max)) {
            long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), TIMEOUT_KEY, 0);
            long start = System.currentTimeMillis();
            long remain = timeout;
            synchronized (rpcStatus) {
                while (!RpcStatus.beginCount(url, methodName, max)) {
                    try {
rpcStatus.wait(remain);
                    } catch (InterruptedException e) {
// ignore
}
                    long elapsed = System.currentTimeMillis() - start;
                    remain = timeout - elapsed;
                    if (remain <= 0) {
                        throw new RpcException(RpcException.LIMIT_EXCEEDED_EXCEPTION,
"Waiting concurrent invoke timeout in client-side for service: " +
invoker.getInterface().getName() + ", method: " + invocation.getMethodName() +
", elapsed: " + elapsed + ", timeout: " + timeout + ". concurrent invokes: " +
rpcStatus.getActive() + ". max concurrent invoke limit: " + max);
}
}
}
}


        invocation.setAttachment(ACTIVELIMIT_FILTER_START_TIME, String.valueOf(System.currentTimeMillis()));


        return invoker.invoke(invocation);
}

5.executes

5.1.executes是什么?

Provider端每个接口的最大并发数,默认是0,如果是0则没有限制。

5.2.executes如何使用?

@Service(executes = 3)

5.3.executes源码分析 

如果executes大于0,则在Provider端调用链会加入ExecuteLimitFilter过滤器,每次Provider接到请求,都会将该接口的并发数+1并判断是否大于executes,如果是则直接抛出RpcException。

@Activate(group = CommonConstants.PROVIDER, value = EXECUTES_KEY)
public class ExecuteLimitFilter extends ListenableFilter {
    private static final String EXECUTELIMIT_FILTER_START_TIME = "execugtelimit_filter_start_time";
    public ExecuteLimitFilter() {
        super.listener = new ExecuteLimitListener();
    }
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        int max = url.getMethodParameter(methodName, EXECUTES_KEY, 0);
        //Dubbo会给每个接口维护一个当时的并发调用数,如果executes大于0,则每次都会判断并发数是否超出了限制
        if (!RpcStatus.beginCount(url, methodName, max)) {
            throw new RpcException(RpcException.LIMIT_EXCEEDED_EXCEPTION,
"Failed to invoke method " + invocation.getMethodName() + " in provider " +
url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max +
"\" /> limited.");
}


invocation.setAttachment(EXECUTELIMIT_FILTER_START_TIME, String.valueOf(System.currentTimeMillis()));
        try {
return invoker.invoke(invocation);
        } catch (Throwable t) {
            if (t instanceof RuntimeException) {
                throw (RuntimeException) t;
} else {
throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
}
}
}

6.数据流程图

如果对上面提到的概念比较模糊,可以看下面这张流程图。


References

[1]
 并发控制: http://dubbo.apache.org/zh-cn/docs/user/demos/concurrency-control.html
[2]
 连接控制: http://dubbo.apache.org/zh-cn/docs/user/demos/config-connections.html

近期热文

Dubbo配置参数源码解析-group

Dubbo配置参数源码解析-stub

Dubbo配置参数源码解析-mock

文章转载自codeImport,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论