目录
1 背景
2 起源
3 原理
3.1 固定窗口
3.2 滑动窗口
3.3 漏桶
3.4 令牌桶
4 实践
4.1 技术选型
4.2 技术实现
5 源码浅析
5.1 acquire
5.2 release
6 结论


背景
在一个高并发系统中,限流(Rate-limiting)是一个我们经常听说到的概念。限流是一种常见的防御措施,避免自己的服务资源被过度使用,以保证服务的可用性。
日常生活中,限流无处不在。每家入户的电闸是对电力资源的限流,用餐高峰期排号系统是对用餐服务的限流,手机网络的限速是对移动网络资源的限流。
在信息技术领域当然也有类似的场景。比如双十一电商平台不断刷新时弹出的“请稍后再试”的提醒,访问一些论坛提示“too many requests”的报错等,都是限流的结果。
结合实际业务需求,从基本原理、实践与源码分析三个方面简单介绍限流。

起源
Atome中台某系统(后文以A系统指代)存在一个定时任务,每天定时执行。每次执行定时任务时,会调用另一系统(后文以B系统指代)的接口。A的数据量越多,调用B的次数越多。
随着业务的发展,A服务每天的数据量逐渐增加,使得定时任务的耗时越来越长。为了支持后续业务的发展,需要对服务进行改造优化。其中一项优化工作是在A系统中使用多个线程同时调用B系统的接口。但是需要注意到,B系统本身可能带有限流设计。如果同时调用B系统线程的数量过多,可能会被B系统拒绝服务。因此需要在A服务的定时任务中,对调用B系统接口的速率做限制限流。

原理
限流其实很容易理解。通过限制对服务资源的访问,保证请求都够正常处理。
一般在应用层,限流设计有异步响应和同步响应两种思路。
其中异步响应指的是服务端接收到请求后,将请求排队,按照先后顺序处理请求,展现在用户面前结果并不是实时的。异步响应比较常见的场景比如微信支付/支付宝等系统的小额提现,官方说法是资金在2小时之内会到账,但是用户通常会发现资金基本上接近实时处理,1分钟之内一般能完成。不过在这个例子里面,异步设计的初衷并不是为了限流,更多是为了提升用户体验。异步响应的实现通常是使用消息队列(常见的开源队有Kafka、RocketMQ、RabbitMQ等)保存客户端的请求,服务端顺序消费队列里面的请求;处理完成后回调客户端,通知请求已经处理完毕。
同步响应指的是在用户操作后,处理结果能够直接返回给用户。如果服务端资源足够处理请求,则处理请求并返回结果;如果服务端的资源不足以处理请求,就直接告诉用户请求失败或稍后再试。比如某宝双11大促抢购折扣商品,结果就是实时返回给用户的:如果下单时商品已经没有库存,可能会提示用户换一种型号或者看看其他商品。同步响应的实现一般有控制访问速率和控制并发数量和两种模式。其中,控制并发数量很容易理解和实现,而控制访问速率一般有固定窗口(Fixed window)、滑动窗口(Sliding window)、令牌桶(Token bucket)、漏桶(Leaky bucket)等算法。
3.1 固定窗口
在软件层面,固定窗口的实现也是最简单的。不过在我们的业务场景中不太适合使用这种方法。不然用户可能会发现我们的服务时好时坏,用户体验很差。3.2 滑动窗口

相比于固定窗口,滑动窗口的设计可以让限流的效果更加均匀。需要满足在任意时刻开始的一个小时之内,请求的数量都小于阈值。因此,滑动窗口可以规避固定窗口的双倍请求问题。

不过,滑动窗口在我们的业务中还是不足够精细。尤其是在用户使用的高峰期同样有可能会导致大量用户请求失败。
漏桶算法思路很像一个有漏洞的水桶(所以叫漏桶算法)。水(请求)会先进入到漏桶里;漏桶以一定的速度出水(处理请求),当水流入速度过大会直接溢出(拒绝请求)。可以看出,漏桶算法是一个能够严格控制客户端请求频次的方法。如果结合消息队列作为缓冲区,则可以最大限度的处理客户端的所有请求,是一个比较实用的解决方案(缓冲区理念在很多地方都有用到,例如磁盘IO缓冲区,TCP缓冲区等)。

实际上,漏桶也可以看成是一种滑动窗口方法,这个滑动窗口的窗口时长极短,容量也极少。
3.4 令牌桶
令牌桶算法的原理是服务端往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。令牌桶算法通过发放令牌,根据设定的频率做请求频率限制,容量限制等。另外,这个算法还能控制不同的服务的权重,比如一个流程较长的服务可以获取多个令牌,一个简单的服务可以只获取一个令牌。

令牌桶算法相比漏桶算法而言区别在于,对于请求不太有规律、可能偶尔存在一个请求高峰的情况下,令牌桶会更加灵活一些。在实践中,令牌桶也是常用的一个方法。

实践
回到上述的业务背景,显然,对于定时任务而言,显然没有必要用窗口方法对A服务发起的请求进行限制,因此处理这个问题的第一反应是用令牌桶RateLimiter对A服务的调用做限制。
但是思考后,觉得使用RateLimiter可能会有一些隐患。考虑如下的情况:B系统的接口由于某种原因(比如网络抖动)延迟增加。假设我们的设计允许最多同时有10个线程访问B端。由于A端产生线程访问的速率是恒定的,那么如果网络波动了几秒钟,可能同时会有超出10个线程同时访问B服务,此时A服务的调用可能会被被拒绝,导致定时任务数据出错。
因此,如果B端同时接受的请求数量有限制,那么限制并发量对A端的访问线程做限制可能会有更好的效果。在实际的优化工作中,使用了java.util.current包里的Semaphore信号量来实现并发控制。
关键代码行数其实比较少,调用Semaphore进行acquire和release就能实现我们设想中的限流功能。
JavaClass TimerTask {public void execute() {items.forEach(item -> {try {semaphore.acquire();executor.run(item, semaphore);} catch (InterruptedException e) {log.error(e.toString());}});}}Class Executor {@Asyncpublic void run(Item item, Semaphore semaphore) {try {callRemoteAPI(item);} catch (Exception e) {log.error(e);}semaphore.release();}}

源码浅析
既然用到了Semaphore这个Java类,而且平时工作中使用频率也非常低。那么我们可以简单看看这个类内部究竟是如何实现的。下文所有的源码分析都基于JDK 8。
进入到Semaphore类可以看到,它的内部实现依赖于AbstractQueuedSynchronizer(AQS)类。Java工程师们一定会知道这个类,因为这个类是Java八股文的重要内容,面试常问。Semaphore类的UML结构如下。

可以看出,Semaphore类内部有3个类,都继承了AQS。其中一个是抽象类Sync,另外两个分别是FairSync公平锁和NonfairSync非公平锁。
在上述的业务中,除去构造方法之外,还用到了semaphore.acquire()和semaphore.release()两个方法。我们看看Semaphore内部的关键方法具体是如何实现的。
acquire的关键代码段如下。
Javapublic void acquire() throws InterruptedException {// 公平锁获取锁的时候,获取锁的个数是1sync.acquireSharedInterruptibly(1);}// 获取锁public final void acquireSharedInterruptibly(int arg) throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 如果剩余锁数量小于0则锁获取失败,需要等待。否则就直接返回,说明获取锁成功。if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}// 尝试获取共享锁protected int tryAcquireShared(int acquires) {for (;;) {if (hasQueuedPredecessors())return -1;// 获取AQS中State的数值int available = getState();// 将剩余锁的数量acquires(公平锁则-1)int remaining = available - acquires;// 如果剩余锁数量小于0,则将available置为原有值并返回剩余数量// 如果剩余锁数量大于0,则将available设为remaining。如果设置失败,则说明有其他线程已经在内存中修改过值了,需要重新循环获取锁if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}// 如果锁暂时获取失败,则进入等待队列private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {// 这里等待的实现用的是AQS的队列,将元素添加到队列尾部final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {// 获取当前节点的前节点final Node p = node.predecessor();if (p == head) {// 如果是头结点,则尝试获取锁int r = tryAcquireShared(arg);if (r >= 0) {// node获取锁后出队,设置新head,并将共享性传播给后继节点setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}// 对节点进行检查和更新状态,如果线程应该阻塞,返回 true。// 线程获取锁失败后入队列并不会立刻阻塞,而是判断是否应该阻塞。只有当头结点的waitStatus是SIGNAL时shouldParkAfterFailedAcquire会返回true继而阻塞if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}
TypeScriptpublic void release() {sync.releaseShared(1);}// 释放共享锁入口public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}// 尝试释放共享锁protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");// 将current + 1,如果失败说明内存中值已经变化了,重新循环if (compareAndSetState(current, next))return true;}}// 释放共享锁逻辑private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;}}
相比于acquire,release的逻辑稍稍少一些。
综上可以看出,Semaphore本质上是一个共享锁,依赖于AQS进行实现。通过设置AQS中的state变量,实现线程间信号量的共享。在公平锁的场景下,调用acquire时,state - 1;当调用release时,state + 1。当state数值不够的时候,就使用AQS中的队列排队等待。在Semaphore和AQS的实现中,还使用到了更底层的CAS保证变量的原子性,这里就不再赘述啦。

结论
根据实际业务需求,从基本原理、实践和与源码浅析几个方面介绍了限流及简单的实现,并最终使用Semaphore信号量实现了访问的限流。在生产环境上,可以发现优化后比优化前的执行效率有成倍的增长。
[9] https://www.cnblogs.com/leesf456/p/5414778.html







