暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

sentinel 滑动时间窗统计的实现

dragon元 2020-10-21
661

在上一章中,FlowSlot
限流校验中是获取node
中的使用量加一来判断是否触发限流,没有在方法中增加使用量,由此可知,统计使用量是其他的solt
做的工作。

所以跟随这章可以思考一下这两个问题:

  1. Sentinel
    底层是如何计算线上系统实时QPS的?
  2. Sentinel
    底层滑动时间窗限流算法怎么实现的?

定位到StatisticSlot
类。

  • StatisticSlot
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
    try {
        // Do some checking.
        fireEntry(context, resourceWrapper, node, count, prioritized, args);// 执行下一个 solt
        // Request passed, add thread count and pass count.
        node.increaseThreadNum();// $1
        node.addPassRequest(count);// $2
        if (context.getCurEntry().getOriginNode() != null) {// $3
            // Add count for origin node.
            context.getCurEntry().getOriginNode().increaseThreadNum();
            context.getCurEntry().getOriginNode().addPassRequest(count);
        }
        if (resourceWrapper.getEntryType() == EntryType.IN) {// $4
            // Add count for global inbound entry node for global statistics.
            Constants.ENTRY_NODE.increaseThreadNum();
            Constants.ENTRY_NODE.addPassRequest(count);
        }
        // Handle pass event with registered entry callback handlers.
        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onPass(context, resourceWrapper, node, count, args);
        }
    } catch (PriorityWaitException ex) {
        ...
    } catch (BlockException e) {
        ...
    } catch (Throwable e) {
        ...
    }
}

$1.增加线程使用量。

$2.增加qps
使用量。

$3.如果存在调用源的node
对象,则增加调用源node
对象的使用量,默认是为空的。

$4.如果为入站流量,则全局统计增加使用量。

这一段代码有点长了,就先不把异常处理贴出来。entry
方法首先调用fireEntry
方法,也就是将处理先交给下个一个处理器,之后再增加使用量,等到后续处理器处理完毕,再增加使用量。
$1、$2
代码看来,是使用了入参中的node
对象,而node
对象内部做了什么操作呢?首先了解node
对象是从哪里创建的。根据之前文章 sentinel
的实现 内容可以知道,处理器链的执行链路为  NodeSelectorSlot
-> ClusterBuilderSlot
-> LogSlot
-> StatisticSlot
-> ParamFlowSlot
-> SystemSlot
-> AuthoritySlot
-> FlowSlot
-> DegradeSlot
,而NodeSelectorSlot
的作用便是创建node
对象。

  • NodeSelectorSlot
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
    throws Throwable 
{
    DefaultNode node = map.get(context.getName());// $1
    if (node == null) {
        synchronized (this) {
            node = map.get(context.getName());
            if (node == null) {
                node = new DefaultNode(resourceWrapper, null);// $2
                HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                cacheMap.putAll(map);
                cacheMap.put(context.getName(), node);
                map = cacheMap;
            }
            // Build invocation tree
            ((DefaultNode)context.getLastNode()).addChild(node);
        }
    }
    context.setCurNode(node);
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

$1.通过上下文名称获取,上下文名称有2个WEB_SERVLET_CONTEXT_NAME
CONTEXT_DEFAULT_NAME
。关闭了sentinel
filter
功能,名称为CONTEXT_DEFAULT_NAME
反之WEB_SERVLET_CONTEXT_NAME

$2.如果不存在则创建对象,类型为DefaultNode

获取到node
对象之后,设置到上下文信息中,并将node
设置为下一个solt
的入参,下面来看看统计的实现。

线程使用量的统计。

  • DefaultNode.increaseThreadNum
@Override
public void increaseThreadNum() {
    super.increaseThreadNum();// $1
    this.clusterNode.increaseThreadNum();// $2
}

$1.调用父类的增加线程使用量方法。

$2.调用clusterNode
的增加线程使用量方法。

DefaultNode
继承自StatisticNode
类。

  • StatisticNode.increaseThreadNum
/**
 * The counter for thread count.
 */

private AtomicInteger curThreadNum = new AtomicInteger(0);

@Override
public void increaseThreadNum() {
    curThreadNum.incrementAndGet();
}

做的操作还是比较简单的,对一个原子类进行自增。

clusterNode
是在什么时候被创建的呢。看到ClusterBuilderSlot.entry
方法

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
    throws Throwable 
{
    if (clusterNode == null) {// $1
        synchronized (lock) {
            if (clusterNode == null) {
                // Create the cluster node.
                clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
                HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
                newMap.putAll(clusterNodeMap);
                newMap.put(node.getId(), clusterNode);
                clusterNodeMap = newMap;
            }
        }
    }
    node.setClusterNode(clusterNode);// $2
    ...
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

$1.不存在则创建,clusterNode
对象为成员变量,所以只会初始化一次,创建的对象类型为ClusterNode

$2.设置进node
对象中。

ClusterNode
DefaultNode
都继承自StatisticNode
ClusterNode
并没有重写父类的increaseThreadNum
方法,所以this.clusterNode.increaseThreadNum();
也会直接调用它父类的方法。

StatisticNode.increaseThreadNum
方法就在上面,就不重复了。

增加线程使用量的代码就这些了,并没有难以理解的东西。而在目标方法执行完之后,会对线程的使用量做减的操作。在StatisticSlot.exit
方法中

public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
    DefaultNode node = (DefaultNode)context.getCurNode();
    if (context.getCurEntry().getError() == null) {
        // Calculate response time (max RT is TIME_DROP_VALVE).
        long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();
        if (rt > Constants.TIME_DROP_VALVE) {
            rt = Constants.TIME_DROP_VALVE;
        }
        // Record response time and success count.
        node.addRtAndSuccess(rt, count);
        if (context.getCurEntry().getOriginNode() != null) {
            context.getCurEntry().getOriginNode().addRtAndSuccess(rt, count);
        }
        node.decreaseThreadNum();// $1
        if (context.getCurEntry().getOriginNode() != null) {
            context.getCurEntry().getOriginNode().decreaseThreadNum();
        }
        if (resourceWrapper.getEntryType() == EntryType.IN) {
            Constants.ENTRY_NODE.addRtAndSuccess(rt, count);
            Constants.ENTRY_NODE.decreaseThreadNum();
        }
    } else {
        // Error may happen.
    }
    ...
}

$1.减去线程的使用量。

每个slot
都实现了exit
方法,会按照处理链的顺序依次执行一遍。

QPS使用量统计,在这里用到了滑动时间窗统计

qps
计数相对于线程计数会比较麻烦,线程限制数量因为没有时间的限制,所以无论在一个多长的时间段内,只要线程使用数量达到阈值,便触发熔断或限流;而qps
则是在一个时间范围内,当qps
在这个时间范围内达到阈值便触发熔断或限流,来看看sentinel
是如何实现的吧。

  • DefaultNode.addPassRequest
@Override
public void addPassRequest(int count) {
    super.addPassRequest(count);
    this.clusterNode.addPassRequest(count);
}

进入StatisticNode.addPassRequest

@Override
public void addPassRequest(int count) {
    rollingCounterInSecond.addPass(count);
    rollingCounterInMinute.addPass(count);
}

rollingCounterInSecond
rollingCounterInMinute
这个对象又是什么呢?看到创建它们的地方。

  • StatisticNode
/**
 *
 * 保存最近{@code INTERVAL}秒的统计信息。{@code INTERVAL}被划分为时间跨度 通过给定的{@code sampleCount}。
 */

private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);// $1
/**
 * 保存最近60秒的统计信息。windowLengthInMs故意设置为1000毫秒,也就是说每一个桶每秒,这样我们就可以得到准确的每秒统计数据。
 */

private transient Metric rollingCounterInMinute = new ArrayMetric(6060 * 1000false);


$1.样本数量:SampleCountProperty.SAMPLE_COUNT = 2
,时间间隔:IntervalProperty.INTERVAL = 1000

rollingCounterInSecond
是记录秒钟内的,rollingCounterInMinute
是记录分钟内的,限流校验是获取秒钟内记录的,只要关注rollingCounterInSecond
对象即可。

通过代码也可知,IntervalProperty.INTERVAL
配置的是1000,所以统计的是1秒钟内的使用量,也就是qps
默认的时间范围是1秒钟,当然这个也写死了,改不了。

源码上的注释已经将这两个类的行为描述清楚了,下面是这个类的两个构造方法。

public ArrayMetric(int sampleCount, int intervalInMs) {
    this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}

public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
    if (enableOccupy) {
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    } else {
        this.data = new BucketLeapArray(sampleCount, intervalInMs);
    }
}

通过构造可以知道rollingCounterInSecond
的类型为OccupiableBucketLeapArray
,而rollingCounterInMinute
的类型为BucketLeapArray

OccupiableBucketLeapArray
顾名思义,可占用的。这里可以留心一下,之后的内容也会说到这个效果,不过这并不是重点,跟着文章了解一下就可以。

BucketLeapArray
OccupiableBucketLeapArray
都继承了LeapArray<MetricBucket>
,下面是他们的构造方法。

  • BucketLeapArray
public BucketLeapArray(int sampleCount, int intervalInMs) {
    super(sampleCount, intervalInMs);
}

  • OccupiableBucketLeapArray
public OccupiableBucketLeapArray(int sampleCount, int intervalInMs) {
    // This class is the original "CombinedBucketArray".
    super(sampleCount, intervalInMs);
    this.borrowArray = new FutureBucketLeapArray(sampleCount, intervalInMs);// $1
}

$1.创建可借的BucketLeap
对象,这个对象就是Occupiable
的关键。

这两个类在构造中都调用了父类的构造方法,接下来就是滑动时间窗的实现了。

  • LeapArray
public LeapArray(int sampleCount, int intervalInMs) {
    AssertUtil.isTrue(sampleCount > 0"bucket count is invalid: " + sampleCount);
    AssertUtil.isTrue(intervalInMs > 0"total time interval of the sliding window should be positive");
    AssertUtil.isTrue(intervalInMs % sampleCount == 0"time span needs to be evenly divided");
    this.windowLengthInMs = intervalInMs / sampleCount;// $1
    this.intervalInMs = intervalInMs;// $2
    this.sampleCount = sampleCount;// $3
    this.array = new AtomicReferenceArray<>(sampleCount);// $4
}

$1.通过时间间隔和时间窗数量,计算得出时间窗的时间长度。例如rollingCounterInSecond
这个变量的创建,该值就为 1000 / 2 = 500。

$2.记录时间间隔

$3.记录时间窗的数量。

$4.通过传入的样本数量创建AtomicReferenceArray
,创建一个原子数组,这个数组便是保存时间窗的数组。

  • 在了解了rollingCounterInSecond
    的类型之后。进入ArrayMetric.addPass
@Override
public void addPass(int count) {
    WindowWrap<MetricBucket> wrap = data.currentWindow();// $1
    wrap.value().addPass(count);// $2
}

$1.获取当前的时间窗。

$2.对时间窗内的对象增加qps

data
对象的类型在上面创建ArrayMetric
是已经贴出来,目前需要搞清楚WindowWrap
和它的value
是什么。

  • data.currentWindow
public WindowWrap<T> currentWindow() {
    return currentWindow(TimeUtil.currentTimeMillis());// $1
}

TimeUtil.currentTimeMillis()
获取当前系统的时间,sentinel
源码中频繁使用了该方法。

public final class TimeUtil {
    private static volatile long currentTimeMillis;// $1
    static {
        currentTimeMillis = System.currentTimeMillis();// $2
        Thread daemon = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    currentTimeMillis = System.currentTimeMillis();
                    try {
                        TimeUnit.MILLISECONDS.sleep(1);// $3
                    } catch (Throwable e) {
                    }
                }
            }
        });
        daemon.setDaemon(true);// $4
        daemon.setName("sentinel-time-tick-thread");
        daemon.start();
    }
    public static long currentTimeMillis() {
        return currentTimeMillis;
    }
}

$1.成员变量,并且用volatile
修饰,保证了线程之间的可见性。

$2.获取的系统的时间。

$3.休息1毫秒。

$4.设置为守护线程。

新建一个守护线程,一直获取并更新时间,通过这种方式,可以避免系统在并发获取时间时,每次操作都会调用native
方法,虽然时间更新只有1毫秒的间隔,在sentinel
中获取时间还是挺频繁的,这样做是可以减少部分native
方法的调用。

  • data.currentWindow(long timeMillis)
public WindowWrap<T> currentWindow(long timeMillis) {
    if (timeMillis < 0) {
        return null;
    }
    int idx = calculateTimeIdx(timeMillis);// $1
    // Calculate current bucket start time.
    long windowStart = calculateWindowStart(timeMillis);// $1
    while (true) {
        WindowWrap<T> old = array.get(idx);// $3
        if (old == null) {
            WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));// $4
            if (array.compareAndSet(idx, null, window)) {
                // Successfully updated, return the created bucket.
                return window;
            } else {
                // Contention failed, the thread will yield its time slice to wait for bucket available.
                Thread.yield();
            }
        } else if (windowStart == old.windowStart()) {// $5
            return old;
        } else if (windowStart > old.windowStart()) {
            if (updateLock.tryLock()) {
                try {
                    // Successfully get the update lock, now we reset the bucket.
                    return resetWindowTo(old, windowStart);// $6
                } finally {
                    updateLock.unlock();
                }
            } else {
                // Contention failed, the thread will yield its time slice to wait for bucket available.
                Thread.yield();
            }
        } else if (windowStart < old.windowStart()) {
            // Should not go through here, as the provided time is already behind.
            return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
        }
    }
}


从这开始,每一个解释可能会比较长。

$1.通过当前时间计算下标。

private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
    long timeId = timeMillis / windowLengthInMs;
    // Calculate current index so we can map the timestamp to the leap array.
    return (int)(timeId % array.length());
}


之后都以rollingCounterInSecond
为例,intervalInMs=1000,sampleCount=2、windowLengthInMs=500

例如当前时间timeMillis = 1602732298755,windowLengthInMs = 500,timeMillis / windowLengthInMs = 3205464597。timeId = 3205464597
,再通过timeId
对数组长度取模得到数组的下标,由于数组长度为2,timeId
是奇数,所以返回值为1。

$2.计算时间窗的开始时间。

protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
    return timeMillis - timeMillis % windowLengthInMs;
}


例如当前时间 timeMillis = 1602732298755,windowLengthInMs = 500,timeMillis % windowLengthInMs = 255,timeMillis - 255= 1602732298500
。按照2个时间窗平分1000毫秒,那么第一个时间窗开始为时间为1602732298000
,第二个则为1602732298500
,所以这里取得是第二个窗口的开始时间,也就是下标为1的时间窗的开始时间,和$1对上了。

$3.获取原子数组中的WindowWrap
对象。

$4.如果没有从数组中获取到则新建。

WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));

windowStart
时间窗的开始时间,windowLengthInMs
每个时间窗的持续时间,newEmptyBucket
是一个方法,作用是创建一个空的桶,用来保存每个时间窗中统计的使用量。

public abstract T newEmptyBucket(long timeMillis);

该方法是抽象方法,需要子类来实现,已知rollingCounterInSecond
类型为OccupiableBucketLeapArray
rollingCounterInMinute
类型为BucketLeapArray
,下面是这两个类的实现。

  • BucketLeapArray.newEmptyBucket
public MetricBucket newEmptyBucket(long time) {
    return new MetricBucket();
}

  • OccupiableBucketLeapArray.newEmptyBucket
public MetricBucket newEmptyBucket(long time) {
    MetricBucket newBucket = new MetricBucket();
    MetricBucket borrowBucket = borrowArray.getWindowValue(time);
    if (borrowBucket != null) {
        newBucket.reset(borrowBucket);
    }
    return newBucket;
}

两者区别并不大,都是创建的MetricBucket
对象,有所不同的是OccupiableBucketLeapArray
支持借用下一个时间窗,所以在创建MetricBucket
对象时,需要将借用过的时间窗的使用数量设置到新建对象中。

  • MetricBucket
public class MetricBucket {

    private final LongAdder[] counters;

    private volatile long minRt;

    public MetricBucket() {
        MetricEvent[] events = MetricEvent.values();
        this.counters = new LongAdder[events.length];
        for (MetricEvent event : events) {
            counters[event.ordinal()] = new LongAdder();
        }
        initMinRt();
    }
    
    private void initMinRt() {
        this.minRt = Constants.TIME_DROP_VALVE;
    }
    ...
}

看到它的构造方法,MetricEvent.values
对应6种事件分别为 PASS、BLOCK、EXCEPTION、SUCCESS、RT、OCCUPIED_PASS
,每个事件都对应一个LongAdder
对象。LongAdder
并发效率优于AtomicInteger、AtomicLong
,这里不展开它了,感兴趣可以搜搜。

$5.如果存在则比较开始时间是否相等,相等就直接返回。

$6.如果当前时间大于从数组中获取的时间窗的开始时间,则获取锁,并重置该时间窗的信息。updateLock
的类型为ReentrantLock

  • LeapArray.resetWindowTo
protected abstract WindowWrap<T> resetWindowTo(WindowWrap<T> windowWrap, long startTime);

也是一个抽象方法,所以OccupiableBucketLeapArray
BucketLeapArray
又有一点点区别。

  • BucketLeapArray.resetWindowTo
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
    // Update the start time and reset value.
    w.resetTo(startTime);
    w.value().reset();
    return w;
}

  • OccupiableBucketLeapArray.resetWindowTo
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long time) {
    // Update the start time and reset value.
    w.resetTo(time);
    MetricBucket borrowBucket = borrowArray.getWindowValue(time);
    if (borrowBucket != null) {
        w.value().reset();
        w.value().addPass((int)borrowBucket.pass());
    } else {
        w.value().reset();
    }
    return w;
}

OccupiableBucketLeapArray
中多做的操作就是把原先借用时间窗的使用量,加到现在这个时间窗中。

两个类的resetWindowTo
方法中,主要的是w.resetTo(time)
w.value().reset()
方法。

  • resetTo
    设置新的开始时间。
public WindowWrap<T> resetTo(long startTime) {
    this.windowStart = startTime;
    return this;
}

w.value().reset()
w.value()
获取MetricBucket
对象。

  • MetricBucket.reset
public MetricBucket reset() {
    for (MetricEvent event : MetricEvent.values()) {
        counters[event.ordinal()].reset();
    }
    initMinRt();
    return this;
}

重置LongAdder
集合中每个LongAdder
对象的计数,获取时间窗就到这结束了。

在获取到了时间窗对象之后,便开始进行记录。

wrap.value().addPass(count);

  • MetricBucket.addPass
public void addPass(int n) {
    add(MetricEvent.PASS, n);
}

public MetricBucket add(MetricEvent event, long n) {
    counters[event.ordinal()].add(n);
    return this;
}

从数组获取LongAdder
对象并调用它的add
增加使用量。

这就是sentinel
的滑动时间窗统计的实现,看下来是不是发现挺简单的呢。

文章转载自dragon元,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论