暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

sentinel 中 FlowSlot 实现

dragon元 2020-10-13
460

本章来跟随FlowSlot
的源码,看看流控是如何实现的。

  • FlowSlot.entry
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
    checkFlow(resourceWrapper, context, node, count, prioritized);// $1
    fireEntry(context, resourceWrapper, node, count, prioritized, args);// $2
}

$1.限流校验

$2.执行下一个处理器

  • FlowSlot.checkFlow
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
    checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);// $1
}

$1.交给checker
对象做校验。

checker
对象的类型是FlowRuleChecker
。从下面的构造方法也可以看出来。

private final FlowRuleChecker checker;

public FlowSlot() {
    this(new FlowRuleChecker());
}

FlowSlot(FlowRuleChecker checker) {
    AssertUtil.notNull(checker, "flow checker should not be null");
    this.checker = checker;
}

ruleProvider
对象是个Function

private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
    @Override
    public Collection<FlowRule> apply(String resource) {
        // Flow rule map should not be null.
        Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
        return flowRules.get(resource);
    }
};

  • FlowRuleChecker.checkFlow
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
    if (ruleProvider == null || resource == null) {
        return;
    }
    Collection<FlowRule> rules = ruleProvider.apply(resource.getName());// $1
    if (rules != null) {
        for (FlowRule rule : rules) {
            if (!canPassCheck(rule, context, node, count, prioritized)) {// $2
                throw new FlowException(rule.getLimitApp(), rule);
            }
        }
    }
}

$1.通过资源名称获取规则集合。

$2.获取到的每个规则都做一遍校验。

  • FlowRuleChecker.canPassCheck
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
    String limitApp = rule.getLimitApp();
    if (limitApp == null) {// $1
        return true;
    }
    if (rule.isClusterMode()) {// $2
        return passClusterCheck(rule, context, node, acquireCount, prioritized);
    }
    return passLocalCheck(rule, context, node, acquireCount, prioritized);// $3
}

$1.limitApp
不会为null
,在绑定规则时会校验,如果没有设置则为default

$2.判断是否为集群模式,是则通过集群方式校验。

$3.否则只校验本地机器的流量。

  • FlowRuleChecker.passLocalCheck
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
    Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);// $1
    if (selectedNode == null) {
        return true;
    }
    return rule.getRater().canPass(selectedNode, acquireCount, prioritized);// $2
}

$1.选择一个节点

$2.校验该节点是否能通过

node
对象是记录当前资源的使用量的,根据策略、来源的不同,会得到不同的node
对象,所以在这里需要选择一个正确的node
对象来做校验。

  • FlowRuleChecker.selectNodeByRequesterAndStrategy
static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
    // The limit app should not be empty.
    String limitApp = rule.getLimitApp();// 调用来源
    int strategy = rule.getStrategy();// 调用关系限流策略
    String origin = context.getOrigin();// 上下来源 默认为 "" 
    if (limitApp.equals(origin) && filterOrigin(origin)) {// $1
        // 调用关系限流策略为直接,则返回源节点
        if (strategy == RuleConstant.STRATEGY_DIRECT) {
            // Matches limit origin, return origin statistic node.
            return context.getOriginNode();
        }
        return selectReferenceNode(rule, context, node);
    } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {// $2
        // 调用关系限流策略为直接,则返回集群节点
        if (strategy == RuleConstant.STRATEGY_DIRECT) {
            // Return the cluster node.
            return node.getClusterNode();
        }
        return selectReferenceNode(rule, context, node);
    } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp) 
        && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) { // $3
        // 调用关系限流策略为直接,则返回源节点
        if (strategy == RuleConstant.STRATEGY_DIRECT) {
            return context.getOriginNode();
        }
        return selectReferenceNode(rule, context, node);
    }
    return null;
}

$1.调用来源和上下文来源一致,并且不等于default
other

$2.limitApp
等于default
则进入。

$3.limitApp
等于other
则进入,并且配置的规则当中存在limitApp=origin
值的。

通过演示代码的配置可知,在这个方法中会进入$2
处的判断,所以这里返回的是ClusterNode

在了解了selectNodeByRequesterAndStrategy
方法之后,继续之前的代码rule.getRater().canPass(selectedNode, acquireCount, prioritized);

通过rule.getRater()
方法获取流量控制器。

/**
 * The traffic shaping (throttling) controller.
 */

private TrafficShapingController controller;

TrafficShapingController getRater() {
    return controller;
}

TrafficShapingController
有4个子类 DefaultController
RateLimiterController
WarmUpController
WarmUpRateLimiterController
别对应了几种流控效果,而想要解当前型,那么需要了解它是如何创建的

规则创建时,在FlowRuleUtil.buildFlowRuleMap
方法中有这样一段。

TrafficShapingController rater = generateRater(rule);
rule.setRater(rater);

进入generateRater

private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {
    if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {// 判断限流类型 RuleConstant.FLOW_GRADE_QPS = 1
        switch (rule.getControlBehavior()) {// 根据控制器行为判断
            case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:// 预热
                return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
                    ColdFactorProperty.coldFactor);
            case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:// 速率限制器
                return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
            case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:// 预热+速率限制器
                return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
                    rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
            case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
            default:
                // Default mode or unknown mode: default traffic shaping controller (fast-reject).
        }
    }
    return new DefaultController(rule.getCount(), rule.getGrade());// 返回默认
}

在演示的配置文件中grade
配置的为1,那么就是会进入该方法,但是呢,controlBehavior
又配置为0,所以几个case
都匹配不上,最后这个方法创建的对象是DefaultController

清楚了对象类型,那么就直接进入对应方法。

  • DefaultController.canPass
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
    int curCount = avgUsedTokens(node);// $1
    if (curCount + acquireCount > count) {// $2
        if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {// $3
            long currentTime;
            long waitInMs;
            currentTime = TimeUtil.currentTimeMillis();
            waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
            if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
                node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                node.addOccupiedPass(acquireCount);
                sleep(waitInMs);
                // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
                throw new PriorityWaitException(waitInMs);
            }
        }
        return false;
    }
    return true;
}

$1.获取使用数量

$2.count
为在配置文件中配置的阈值,使用数量加本次使用数量,判断是否超过阈值。

$3.prioritized
默认传入false
,所以该方法默认不会进入,作用就是看看是否可以借用一个窗口,这里不知道什么意思不要急,可以看下一章。会分析sentinel
的滑动窗口统计机制。

canPass
方法中使用avgUsedTokens
方法获取当前资源的使用量。

  • avgUsedTokens
private int avgUsedTokens(Node node) {
    if (node == null) {
        return DEFAULT_AVG_USED_TOKENS;
    }
    return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}

通过node
对象内部的方法,判断限流类型,获取qps
或是threadNum
的使用量。

回到checkFlow
方法中,canPassCheck
方法如果返回为false
则会抛出FlowException
异常,以此来实现限流。

if (!canPassCheck(rule, context, node, count, prioritized)) {
    throw new FlowException(rule.getLimitApp(), rule);
}

流量控制的实现就这么多了,当然还有一些没有分析的,不同的流控效果,会有不同的TrafficShapingController
对象,想了解的可以自己看看其余几个类的实现。


文章转载自dragon元,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论