本章来跟随FlowSlot
的源码,看看流控是如何实现的。
FlowSlot.entry
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
checkFlow(resourceWrapper, context, node, count, prioritized);// $1
fireEntry(context, resourceWrapper, node, count, prioritized, args);// $2
}
$1.限流校验
$2.执行下一个处理器
FlowSlot.checkFlow
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);// $1
}
$1.交给checker
对象做校验。
checker
对象的类型是FlowRuleChecker
。从下面的构造方法也可以看出来。
private final FlowRuleChecker checker;
public FlowSlot() {
this(new FlowRuleChecker());
}
FlowSlot(FlowRuleChecker checker) {
AssertUtil.notNull(checker, "flow checker should not be null");
this.checker = checker;
}
ruleProvider
对象是个Function
private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
@Override
public Collection<FlowRule> apply(String resource) {
// Flow rule map should not be null.
Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
return flowRules.get(resource);
}
};
FlowRuleChecker.checkFlow
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());// $1
if (rules != null) {
for (FlowRule rule : rules) {
if (!canPassCheck(rule, context, node, count, prioritized)) {// $2
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
$1.通过资源名称获取规则集合。
$2.获取到的每个规则都做一遍校验。
FlowRuleChecker.canPassCheck
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
String limitApp = rule.getLimitApp();
if (limitApp == null) {// $1
return true;
}
if (rule.isClusterMode()) {// $2
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
return passLocalCheck(rule, context, node, acquireCount, prioritized);// $3
}
$1.limitApp
不会为null
,在绑定规则时会校验,如果没有设置则为default
。
$2.判断是否为集群模式,是则通过集群方式校验。
$3.否则只校验本地机器的流量。
FlowRuleChecker.passLocalCheck
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);// $1
if (selectedNode == null) {
return true;
}
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);// $2
}
$1.选择一个节点
$2.校验该节点是否能通过
node
对象是记录当前资源的使用量的,根据策略、来源的不同,会得到不同的node
对象,所以在这里需要选择一个正确的node
对象来做校验。
FlowRuleChecker.selectNodeByRequesterAndStrategy
static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
// The limit app should not be empty.
String limitApp = rule.getLimitApp();// 调用来源
int strategy = rule.getStrategy();// 调用关系限流策略
String origin = context.getOrigin();// 上下来源 默认为 ""
if (limitApp.equals(origin) && filterOrigin(origin)) {// $1
// 调用关系限流策略为直接,则返回源节点
if (strategy == RuleConstant.STRATEGY_DIRECT) {
// Matches limit origin, return origin statistic node.
return context.getOriginNode();
}
return selectReferenceNode(rule, context, node);
} else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {// $2
// 调用关系限流策略为直接,则返回集群节点
if (strategy == RuleConstant.STRATEGY_DIRECT) {
// Return the cluster node.
return node.getClusterNode();
}
return selectReferenceNode(rule, context, node);
} else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
&& FlowRuleManager.isOtherOrigin(origin, rule.getResource())) { // $3
// 调用关系限流策略为直接,则返回源节点
if (strategy == RuleConstant.STRATEGY_DIRECT) {
return context.getOriginNode();
}
return selectReferenceNode(rule, context, node);
}
return null;
}
$1.调用来源和上下文来源一致,并且不等于default
或other
。
$2.limitApp
等于default
则进入。
$3.limitApp
等于other
则进入,并且配置的规则当中存在limitApp=origin
值的。
通过演示代码的配置可知,在这个方法中会进入$2
处的判断,所以这里返回的是ClusterNode
。
在了解了selectNodeByRequesterAndStrategy
方法之后,继续之前的代码rule.getRater().canPass(selectedNode, acquireCount, prioritized);
。
通过rule.getRater()
方法获取流量控制器。
/**
* The traffic shaping (throttling) controller.
*/
private TrafficShapingController controller;
TrafficShapingController getRater() {
return controller;
}
TrafficShapingController
有4个子类 DefaultController
,RateLimiterController
,WarmUpController
,WarmUpRateLimiterController
,分别对应了几种流控效果,而想要了解当前对象的类型,那么需要了解它是如何创建的。
规则创建时,在FlowRuleUtil.buildFlowRuleMap
方法中有这样一段。
TrafficShapingController rater = generateRater(rule);
rule.setRater(rater);
进入generateRater
private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {// 判断限流类型 RuleConstant.FLOW_GRADE_QPS = 1
switch (rule.getControlBehavior()) {// 根据控制器行为判断
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:// 预热
return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:// 速率限制器
return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:// 预热+速率限制器
return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
default:
// Default mode or unknown mode: default traffic shaping controller (fast-reject).
}
}
return new DefaultController(rule.getCount(), rule.getGrade());// 返回默认
}
在演示的配置文件中grade
配置的为1,那么就是会进入该方法,但是呢,controlBehavior
又配置为0,所以几个case
都匹配不上,最后这个方法创建的对象是DefaultController
。
清楚了对象类型,那么就直接进入对应方法。
DefaultController.canPass
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
int curCount = avgUsedTokens(node);// $1
if (curCount + acquireCount > count) {// $2
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {// $3
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}
$1.获取使用数量
$2.count
为在配置文件中配置的阈值,使用数量加本次使用数量,判断是否超过阈值。
$3.prioritized
默认传入false
,所以该方法默认不会进入,作用就是看看是否可以借用一个窗口,这里不知道什么意思不要急,可以看下一章。会分析sentinel
的滑动窗口统计机制。
在canPass
方法中使用avgUsedTokens
方法获取当前资源的使用量。
avgUsedTokens
private int avgUsedTokens(Node node) {
if (node == null) {
return DEFAULT_AVG_USED_TOKENS;
}
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}
通过node
对象内部的方法,判断限流类型,获取qps
或是threadNum
的使用量。
回到checkFlow
方法中,canPassCheck
方法如果返回为false
则会抛出FlowException
异常,以此来实现限流。
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
流量控制的实现就这么多了,当然还有一些没有分析的,不同的流控效果,会有不同的TrafficShapingController
对象,想了解的可以自己看看其余几个类的实现。




