在上一章中,FlowSlot
限流校验中是获取node
中的使用量加一来判断是否触发限流,没有在方法中增加使用量,由此可知,统计使用量是其他的solt
做的工作。
所以跟随这章可以思考一下这两个问题:
Sentinel
底层是如何计算线上系统实时QPS的?Sentinel
底层滑动时间窗限流算法怎么实现的?
定位到StatisticSlot
类。
StatisticSlot
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
try {
// Do some checking.
fireEntry(context, resourceWrapper, node, count, prioritized, args);// 执行下一个 solt
// Request passed, add thread count and pass count.
node.increaseThreadNum();// $1
node.addPassRequest(count);// $2
if (context.getCurEntry().getOriginNode() != null) {// $3
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {// $4
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
...
} catch (BlockException e) {
...
} catch (Throwable e) {
...
}
}
$1.增加线程使用量。
$2.增加qps
使用量。
$3.如果存在调用源的node
对象,则增加调用源node
对象的使用量,默认是为空的。
$4.如果为入站流量,则全局统计增加使用量。
这一段代码有点长了,就先不把异常处理贴出来。entry
方法首先调用fireEntry
方法,也就是将处理先交给下个一个处理器,之后再增加使用量,等到后续处理器处理完毕,再增加使用量。
从$1、$2
代码看来,是使用了入参中的node
对象,而node
对象内部做了什么操作呢?首先了解node
对象是从哪里创建的。根据之前文章 sentinel
的实现 内容可以知道,处理器链的执行链路为 NodeSelectorSlot
-> ClusterBuilderSlot
-> LogSlot
-> StatisticSlot
-> ParamFlowSlot
-> SystemSlot
-> AuthoritySlot
-> FlowSlot
-> DegradeSlot
,而NodeSelectorSlot
的作用便是创建node
对象。
NodeSelectorSlot
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
DefaultNode node = map.get(context.getName());// $1
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
node = new DefaultNode(resourceWrapper, null);// $2
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
}
// Build invocation tree
((DefaultNode)context.getLastNode()).addChild(node);
}
}
context.setCurNode(node);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
$1.通过上下文名称获取,上下文名称有2个WEB_SERVLET_CONTEXT_NAME
、CONTEXT_DEFAULT_NAME
。关闭了sentinel
的filter
功能,名称为CONTEXT_DEFAULT_NAME
反之WEB_SERVLET_CONTEXT_NAME
。
$2.如果不存在则创建对象,类型为DefaultNode
。
获取到node
对象之后,设置到上下文信息中,并将node
设置为下一个solt
的入参,下面来看看统计的实现。
线程使用量的统计。
DefaultNode.increaseThreadNum
@Override
public void increaseThreadNum() {
super.increaseThreadNum();// $1
this.clusterNode.increaseThreadNum();// $2
}
$1.调用父类的增加线程使用量方法。
$2.调用clusterNode
的增加线程使用量方法。
DefaultNode
继承自StatisticNode
类。
StatisticNode.increaseThreadNum
/**
* The counter for thread count.
*/
private AtomicInteger curThreadNum = new AtomicInteger(0);
@Override
public void increaseThreadNum() {
curThreadNum.incrementAndGet();
}
做的操作还是比较简单的,对一个原子类进行自增。
而clusterNode
是在什么时候被创建的呢。看到ClusterBuilderSlot.entry
方法
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
throws Throwable {
if (clusterNode == null) {// $1
synchronized (lock) {
if (clusterNode == null) {
// Create the cluster node.
clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
newMap.putAll(clusterNodeMap);
newMap.put(node.getId(), clusterNode);
clusterNodeMap = newMap;
}
}
}
node.setClusterNode(clusterNode);// $2
...
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
$1.不存在则创建,clusterNode
对象为成员变量,所以只会初始化一次,创建的对象类型为ClusterNode
。
$2.设置进node
对象中。
ClusterNode
和DefaultNode
都继承自StatisticNode
。ClusterNode
并没有重写父类的increaseThreadNum
方法,所以this.clusterNode.increaseThreadNum();
也会直接调用它父类的方法。
StatisticNode.increaseThreadNum
方法就在上面,就不重复了。
增加线程使用量的代码就这些了,并没有难以理解的东西。而在目标方法执行完之后,会对线程的使用量做减的操作。在StatisticSlot.exit
方法中
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
DefaultNode node = (DefaultNode)context.getCurNode();
if (context.getCurEntry().getError() == null) {
// Calculate response time (max RT is TIME_DROP_VALVE).
long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();
if (rt > Constants.TIME_DROP_VALVE) {
rt = Constants.TIME_DROP_VALVE;
}
// Record response time and success count.
node.addRtAndSuccess(rt, count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().addRtAndSuccess(rt, count);
}
node.decreaseThreadNum();// $1
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().decreaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.addRtAndSuccess(rt, count);
Constants.ENTRY_NODE.decreaseThreadNum();
}
} else {
// Error may happen.
}
...
}
$1.减去线程的使用量。
每个slot
都实现了exit
方法,会按照处理链的顺序依次执行一遍。
QPS使用量统计,在这里用到了滑动时间窗统计
qps
计数相对于线程计数会比较麻烦,线程限制数量因为没有时间的限制,所以无论在一个多长的时间段内,只要线程使用数量达到阈值,便触发熔断或限流;而qps
则是在一个时间范围内,当qps
在这个时间范围内达到阈值便触发熔断或限流,来看看sentinel
是如何实现的吧。
DefaultNode.addPassRequest
@Override
public void addPassRequest(int count) {
super.addPassRequest(count);
this.clusterNode.addPassRequest(count);
}
进入StatisticNode.addPassRequest
@Override
public void addPassRequest(int count) {
rollingCounterInSecond.addPass(count);
rollingCounterInMinute.addPass(count);
}
rollingCounterInSecond
和rollingCounterInMinute
这个对象又是什么呢?看到创建它们的地方。
StatisticNode
/**
*
* 保存最近{@code INTERVAL}秒的统计信息。{@code INTERVAL}被划分为时间跨度 通过给定的{@code sampleCount}。
*/
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);// $1
/**
* 保存最近60秒的统计信息。windowLengthInMs故意设置为1000毫秒,也就是说每一个桶每秒,这样我们就可以得到准确的每秒统计数据。
*/
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
$1.样本数量:SampleCountProperty.SAMPLE_COUNT = 2
,时间间隔:IntervalProperty.INTERVAL = 1000
。
rollingCounterInSecond
是记录秒钟内的,rollingCounterInMinute
是记录分钟内的,限流校验是获取秒钟内记录的,只要关注rollingCounterInSecond
对象即可。
通过代码也可知,IntervalProperty.INTERVAL
配置的是1000,所以统计的是1秒钟内的使用量,也就是qps
默认的时间范围是1秒钟,当然这个也写死了,改不了。
源码上的注释已经将这两个类的行为描述清楚了,下面是这个类的两个构造方法。
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
if (enableOccupy) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
} else {
this.data = new BucketLeapArray(sampleCount, intervalInMs);
}
}
通过构造可以知道rollingCounterInSecond
的类型为OccupiableBucketLeapArray
,而rollingCounterInMinute
的类型为BucketLeapArray
。
OccupiableBucketLeapArray
顾名思义,可占用的。这里可以留心一下,之后的内容也会说到这个效果,不过这并不是重点,跟着文章了解一下就可以。
BucketLeapArray
和OccupiableBucketLeapArray
都继承了LeapArray<MetricBucket>
,下面是他们的构造方法。
BucketLeapArray
public BucketLeapArray(int sampleCount, int intervalInMs) {
super(sampleCount, intervalInMs);
}
OccupiableBucketLeapArray
public OccupiableBucketLeapArray(int sampleCount, int intervalInMs) {
// This class is the original "CombinedBucketArray".
super(sampleCount, intervalInMs);
this.borrowArray = new FutureBucketLeapArray(sampleCount, intervalInMs);// $1
}
$1.创建可借的BucketLeap
对象,这个对象就是Occupiable
的关键。
这两个类在构造中都调用了父类的构造方法,接下来就是滑动时间窗的实现了。
LeapArray
public LeapArray(int sampleCount, int intervalInMs) {
AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
this.windowLengthInMs = intervalInMs / sampleCount;// $1
this.intervalInMs = intervalInMs;// $2
this.sampleCount = sampleCount;// $3
this.array = new AtomicReferenceArray<>(sampleCount);// $4
}
$1.通过时间间隔和时间窗数量,计算得出时间窗的时间长度。例如rollingCounterInSecond
这个变量的创建,该值就为 1000 / 2 = 500。
$2.记录时间间隔
$3.记录时间窗的数量。
$4.通过传入的样本数量创建AtomicReferenceArray
,创建一个原子数组,这个数组便是保存时间窗的数组。
在了解了 rollingCounterInSecond
的类型之后。进入ArrayMetric.addPass
@Override
public void addPass(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow();// $1
wrap.value().addPass(count);// $2
}
$1.获取当前的时间窗。
$2.对时间窗内的对象增加qps
。
data
对象的类型在上面创建ArrayMetric
是已经贴出来,目前需要搞清楚WindowWrap
和它的value
是什么。
data.currentWindow
public WindowWrap<T> currentWindow() {
return currentWindow(TimeUtil.currentTimeMillis());// $1
}
TimeUtil.currentTimeMillis()
获取当前系统的时间,sentinel
源码中频繁使用了该方法。
public final class TimeUtil {
private static volatile long currentTimeMillis;// $1
static {
currentTimeMillis = System.currentTimeMillis();// $2
Thread daemon = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
currentTimeMillis = System.currentTimeMillis();
try {
TimeUnit.MILLISECONDS.sleep(1);// $3
} catch (Throwable e) {
}
}
}
});
daemon.setDaemon(true);// $4
daemon.setName("sentinel-time-tick-thread");
daemon.start();
}
public static long currentTimeMillis() {
return currentTimeMillis;
}
}
$1.成员变量,并且用volatile
修饰,保证了线程之间的可见性。
$2.获取的系统的时间。
$3.休息1毫秒。
$4.设置为守护线程。
新建一个守护线程,一直获取并更新时间,通过这种方式,可以避免系统在并发获取时间时,每次操作都会调用native
方法,虽然时间更新只有1毫秒的间隔,在sentinel
中获取时间还是挺频繁的,这样做是可以减少部分native
方法的调用。
data.currentWindow(long timeMillis)
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
int idx = calculateTimeIdx(timeMillis);// $1
// Calculate current bucket start time.
long windowStart = calculateWindowStart(timeMillis);// $1
while (true) {
WindowWrap<T> old = array.get(idx);// $3
if (old == null) {
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));// $4
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart == old.windowStart()) {// $5
return old;
} else if (windowStart > old.windowStart()) {
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);// $6
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
从这开始,每一个解释可能会比较长。
$1.通过当前时间计算下标。
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
long timeId = timeMillis / windowLengthInMs;
// Calculate current index so we can map the timestamp to the leap array.
return (int)(timeId % array.length());
}
之后都以rollingCounterInSecond
为例,intervalInMs=1000,sampleCount=2、windowLengthInMs=500
。
例如当前时间timeMillis = 1602732298755,windowLengthInMs = 500,timeMillis / windowLengthInMs = 3205464597。timeId = 3205464597
,再通过timeId
对数组长度取模得到数组的下标,由于数组长度为2,timeId
是奇数,所以返回值为1。
$2.计算时间窗的开始时间。
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}
例如当前时间 timeMillis = 1602732298755,windowLengthInMs = 500,timeMillis % windowLengthInMs = 255,timeMillis - 255= 1602732298500
。按照2个时间窗平分1000毫秒,那么第一个时间窗开始为时间为1602732298000
,第二个则为1602732298500
,所以这里取得是第二个窗口的开始时间,也就是下标为1的时间窗的开始时间,和$1对上了。
$3.获取原子数组中的WindowWrap
对象。
$4.如果没有从数组中获取到则新建。
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
windowStart
时间窗的开始时间,windowLengthInMs
每个时间窗的持续时间,newEmptyBucket
是一个方法,作用是创建一个空的桶,用来保存每个时间窗中统计的使用量。
public abstract T newEmptyBucket(long timeMillis);
该方法是抽象方法,需要子类来实现,已知rollingCounterInSecond
类型为OccupiableBucketLeapArray
,rollingCounterInMinute
类型为BucketLeapArray
,下面是这两个类的实现。
BucketLeapArray.newEmptyBucket
public MetricBucket newEmptyBucket(long time) {
return new MetricBucket();
}
OccupiableBucketLeapArray.newEmptyBucket
public MetricBucket newEmptyBucket(long time) {
MetricBucket newBucket = new MetricBucket();
MetricBucket borrowBucket = borrowArray.getWindowValue(time);
if (borrowBucket != null) {
newBucket.reset(borrowBucket);
}
return newBucket;
}
两者区别并不大,都是创建的MetricBucket
对象,有所不同的是OccupiableBucketLeapArray
支持借用下一个时间窗,所以在创建MetricBucket
对象时,需要将借用过的时间窗的使用数量设置到新建对象中。
MetricBucket
public class MetricBucket {
private final LongAdder[] counters;
private volatile long minRt;
public MetricBucket() {
MetricEvent[] events = MetricEvent.values();
this.counters = new LongAdder[events.length];
for (MetricEvent event : events) {
counters[event.ordinal()] = new LongAdder();
}
initMinRt();
}
private void initMinRt() {
this.minRt = Constants.TIME_DROP_VALVE;
}
...
}
看到它的构造方法,MetricEvent.values
对应6种事件分别为 PASS、BLOCK、EXCEPTION、SUCCESS、RT、OCCUPIED_PASS
,每个事件都对应一个LongAdder
对象。LongAdder
并发效率优于AtomicInteger、AtomicLong
,这里不展开它了,感兴趣可以搜搜。
$5.如果存在则比较开始时间是否相等,相等就直接返回。
$6.如果当前时间大于从数组中获取的时间窗的开始时间,则获取锁,并重置该时间窗的信息。updateLock
的类型为ReentrantLock
。
LeapArray.resetWindowTo
protected abstract WindowWrap<T> resetWindowTo(WindowWrap<T> windowWrap, long startTime);
也是一个抽象方法,所以OccupiableBucketLeapArray
和BucketLeapArray
又有一点点区别。
BucketLeapArray.resetWindowTo
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
// Update the start time and reset value.
w.resetTo(startTime);
w.value().reset();
return w;
}
OccupiableBucketLeapArray.resetWindowTo
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long time) {
// Update the start time and reset value.
w.resetTo(time);
MetricBucket borrowBucket = borrowArray.getWindowValue(time);
if (borrowBucket != null) {
w.value().reset();
w.value().addPass((int)borrowBucket.pass());
} else {
w.value().reset();
}
return w;
}
OccupiableBucketLeapArray
中多做的操作就是把原先借用时间窗的使用量,加到现在这个时间窗中。
两个类的resetWindowTo
方法中,主要的是w.resetTo(time)
和w.value().reset()
方法。
resetTo
设置新的开始时间。
public WindowWrap<T> resetTo(long startTime) {
this.windowStart = startTime;
return this;
}
w.value().reset()
,w.value()
获取MetricBucket
对象。
MetricBucket.reset
public MetricBucket reset() {
for (MetricEvent event : MetricEvent.values()) {
counters[event.ordinal()].reset();
}
initMinRt();
return this;
}
重置LongAdder
集合中每个LongAdder
对象的计数,获取时间窗就到这结束了。
在获取到了时间窗对象之后,便开始进行记录。
wrap.value().addPass(count);
MetricBucket.addPass
public void addPass(int n) {
add(MetricEvent.PASS, n);
}
public MetricBucket add(MetricEvent event, long n) {
counters[event.ordinal()].add(n);
return this;
}
从数组获取LongAdder
对象并调用它的add
增加使用量。
这就是sentinel
的滑动时间窗统计的实现,看下来是不是发现挺简单的呢。




