暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

起源于定时任务优化的限流思考

领创集团Advance Group 2022-01-21
687

目录

1 背景

2 起源

3 原理

    3.1 固定窗口

    3.2 滑动窗口

    3.3 漏桶

    3.4 令牌桶

4 实践

    4.1 技术选型

    4.2 技术实现

5 源码浅析

    5.1 acquire

    5.2 release

6 结论


背景

在一个高并发系统中,限流(Rate-limiting)是一个我们经常听说到的概念。限流是一种常见的防御措施,避免自己的服务资源被过度使用,以保证服务的可用性。

日常生活中,限流无处不在。每家入户的电闸是对电力资源的限流,用餐高峰期排号系统是对用餐服务的限流,手机网络的限速是对移动网络资源的限流。

在信息技术领域当然也有类似的场景。比如双十一电商平台不断刷新时弹出的“请稍后再试”的提醒,访问一些论坛提示“too many requests”的报错等,都是限流的结果。

结合实际业务需求,从基本原理、实践与源码分析三个方面简单介绍限流。

起源

Atome中台某系统(后文以A系统指代)存在一个定时任务,每天定时执行。每次执行定时任务时,会调用另一系统(后文以B系统指代)的接口。A的数据量越多,调用B的次数越多。

随着业务的发展,A服务每天的数据量逐渐增加,使得定时任务的耗时越来越长。为了支持后续业务的发展,需要对服务进行改造优化。其中一项优化工作是在A系统中使用多个线程同时调用B系统的接口。但是需要注意到,B系统本身可能带有限流设计。如果同时调用B系统线程的数量过多,可能会被B系统拒绝服务。因此需要在A服务的定时任务中,对调用B系统接口的速率做限制限流。

原理

限流其实很容易理解。通过限制对服务资源的访问,保证请求都够正常处理。

一般在应用层,限流设计有异步响应和同步响应两种思路。

其中异步响应指的是服务端接收到请求后,将请求排队,按照先后顺序处理请求,展现在用户面前结果并不是实时的。异步响应比较常见的场景比如微信支付/支付宝等系统的小额提现,官方说法是资金在2小时之内会到账,但是用户通常会发现资金基本上接近实时处理,1分钟之内一般能完成。不过在这个例子里面,异步设计的初衷并不是为了限流,更多是为了提升用户体验。异步响应的实现通常是使用消息队列(常见的开源队有Kafka、RocketMQ、RabbitMQ等)保存客户端的请求,服务端顺序消费队列里面的请求;处理完成后回调客户端,通知请求已经处理完毕。

同步响应指的是在用户操作后,处理结果能够直接返回给用户。如果服务端资源足够处理请求,则处理请求并返回结果;如果服务端的资源不足以处理请求,就直接告诉用户请求失败或稍后再试。比如某宝双11大促抢购折扣商品,结果就是实时返回给用户的:如果下单时商品已经没有库存,可能会提示用户换一种型号或者看看其他商品。同步响应的实现一般有控制访问速率和控制并发数量和两种模式。其中,控制并发数量很容易理解和实现,而控制访问速率一般有固定窗口(Fixed window)、滑动窗口(Sliding window)、令牌桶(Token bucket)、漏桶(Leaky bucket)等算法。

3.1 固定窗口

固定窗口指的是在指定的时间范围内,只能有固定数量的请求能被处理。比如,医院可能开放了每小时100个就诊名额,先到先得。
在软件层面,固定窗口的实现也是最简单的。不过在我们的业务场景中不太适合使用这种方法。不然用户可能会发现我们的服务时好时坏,用户体验很差。

3.2 滑动窗口

“滑动窗口”这个词可能会程序员们让人想起TCP网络协议。固定窗口方法可能会碰到比如限流不均匀(比如所有的患者都在8:30就到医院了)和双倍请求问题(比如第一批患者都在所在窗口的后沿9:29到达医院,而下一批患者在他们所在窗口的最前沿9:31左右也全部到达了医院)。

相比于固定窗口,滑动窗口的设计可以让限流的效果更加均匀。需要满足在任意时刻开始的一个小时之内,请求的数量都小于阈值。因此,滑动窗口可以规避固定窗口的双倍请求问题。

不过,滑动窗口在我们的业务中还是不足够精细。尤其是在用户使用的高峰期同样有可能会导致大量用户请求失败。

3.3 漏桶

漏桶算法思路很像一个有漏洞的水桶(所以叫漏桶算法)。水(请求)会先进入到漏桶里;漏桶以一定的速度出水(处理请求),当水流入速度过大会直接溢出(拒绝请求)。可以看出,漏桶算法是一个能够严格控制客户端请求频次的方法。如果结合消息队列作为缓冲区,则可以最大限度的处理客户端的所有请求,是一个比较实用的解决方案(缓冲区理念在很多地方都有用到,例如磁盘IO缓冲区,TCP缓冲区等)。

实际上,漏桶也可以看成是一种滑动窗口方法,这个滑动窗口的窗口时长极短,容量也极少。

3.4 令牌

令牌桶算法的原理是服务端往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。令牌桶算法通过发放令牌,根据设定的频率做请求频率限制,容量限制等。另外,这个算法还能控制不同的服务的权重,比如一个流程较长的服务可以获取多个令牌,一个简单的服务可以只获取一个令牌。

令牌桶算法相比漏桶算法而言区别在于,对于请求不太有规律、可能偶尔存在一个请求高峰的情况下,令牌桶会更加灵活一些。在实践中,令牌桶也是常用的一个方法。

实践

4.1 技术选型

回到上述的业务背景,显然,对于定时任务而言,显然没有必要用窗口方法对A服务发起的请求进行限制,因此处理这个问题的第一反应是用令牌桶RateLimiter对A服务的调用做限制。

但是思考后,觉得使用RateLimiter可能会有一些隐患。考虑如下的情况:B系统的接口由于某种原因(比如网络抖动)延迟增加。假设我们的设计允许最多同时有10个线程访问B端。由于A端产生线程访问的速率是恒定的,那么如果网络波动了几秒钟,可能同时会有超出10个线程同时访问B服务,此时A服务的调用可能会被被拒绝,导致定时任务数据出错。

因此,如果B端同时接受的请求数量有限制,那么限制并发量对A端的访问线程做限制可能会有更好的效果。在实际的优化工作中,使用了java.util.current包里的Semaphore信号量来实现并发控制。

4.2 技术实现

关键代码行数其实比较少,调用Semaphore进行acquire和release就能实现我们设想中的限流功能。

    Java
    Class TimerTask {
        public void execute() {
            items.forEach(item -> {
                try {
                    semaphore.acquire();
                    executor.run(item, semaphore);
                } catch (InterruptedException e) {
                    log.error(e.toString());
                }
            });
        }
    }
    Class Executor { 
    @Async
        public void run(Item item, Semaphore semaphore) {
    try {
     callRemoteAPI(item);
            } catch (Exception e) {
                log.error(e);
            }
            semaphore.release();
        }
    }

    源码浅析

    既然用到了Semaphore这个Java类,而且平时工作中使用频率也非常低。那么我们可以简单看看这个类内部究竟是如何实现的。下文所有的源码分析都基于JDK 8。

    进入到Semaphore类可以看到,它的内部实现依赖于AbstractQueuedSynchronizer(AQS)类。Java工程师们一定会知道这个类,因为这个类是Java八股文的重要内容,面试常问。Semaphore类的UML结构如下。

    可以看出,Semaphore类内部有3个类,都继承了AQS。其中一个是抽象类Sync,另外两个分别是FairSync公平锁和NonfairSync非公平锁。

    在上述的业务中,除去构造方法之外,还用到了semaphore.acquire()和semaphore.release()两个方法。我们看看Semaphore内部的关键方法具体是如何实现的。

    5.1 acquire

    acquire的关键代码段如下。

      Java
      public void acquire() throws InterruptedException {
          // 公平锁获取锁的时候,获取锁的个数是1
          sync.acquireSharedInterruptibly(1);
      }
      // 获取锁
      public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
          if (Thread.interrupted())
              throw new InterruptedException();
          // 如果剩余锁数量小于0则锁获取失败,需要等待。否则就直接返回,说明获取锁成功。
          if (tryAcquireShared(arg) < 0)
              doAcquireSharedInterruptibly(arg);
      }
      // 尝试获取共享锁
      protected int tryAcquireShared(int acquires) {
          for (;;) {
              if (hasQueuedPredecessors())
                  return -1;
              // 获取AQS中State的数值
              int available = getState();
              // 将剩余锁的数量acquires(公平锁则-1)
              int remaining = available - acquires;
              // 如果剩余锁数量小于0,则将available置为原有值并返回剩余数量
              // 如果剩余锁数量大于0,则将available设为remaining。如果设置失败,则说明有其他线程已经在内存中修改过值了,需要重新循环获取锁
              if (remaining < 0 ||
                  compareAndSetState(available, remaining))
                  return remaining;
          }
      }
      // 如果锁暂时获取失败,则进入等待队列
      private void doAcquireSharedInterruptibly(int arg)
          throws InterruptedException {
          // 这里等待的实现用的是AQS的队列,将元素添加到队列尾部
          final Node node = addWaiter(Node.SHARED);
          boolean failed = true;
          try {
              for (;;) {
                  // 获取当前节点的前节点
                  final Node p = node.predecessor();
                  if (p == head) {
                      // 如果是头结点,则尝试获取锁
                      int r = tryAcquireShared(arg);
                      if (r >= 0) {
                          // node获取锁后出队,设置新head,并将共享性传播给后继节点
                          setHeadAndPropagate(node, r);
                          p.next = null; // help GC
                          failed = false;
                          return;
                      }
                  }
                  // 对节点进行检查和更新状态,如果线程应该阻塞,返回 true。
                  // 线程获取锁失败后入队列并不会立刻阻塞,而是判断是否应该阻塞。只有当头结点的waitStatus是SIGNAL时shouldParkAfterFailedAcquire会返回true继而阻塞
                  if (shouldParkAfterFailedAcquire(p, node) &&
                      parkAndCheckInterrupt())
                      throw new InterruptedException();
              }
          } finally {
              if (failed)
                  cancelAcquire(node);
          }
      }
      5.2 release
      关键代码段如下:
        TypeScript
        public void release() {
            sync.releaseShared(1);
        }


        // 释放共享锁入口
        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }


        // 尝试释放共享锁
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                // 将current + 1,如果失败说明内存中值已经变化了,重新循环
                if (compareAndSetState(current, next))
                    return true;
            }
        }
        // 释放共享锁逻辑
        private void doReleaseShared() {
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }

        相比于acquire,release的逻辑稍稍少一些。

        综上可以看出,Semaphore本质上是一个共享锁,依赖于AQS进行实现。通过设置AQS中的state变量,实现线程间信号量的共享。在公平锁的场景下,调用acquire时,state - 1;当调用release时,state + 1。当state数值不够的时候,就使用AQS中的队列排队等待。在Semaphore和AQS的实现中,还使用到了更底层的CAS保证变量的原子性,这里就不再赘述啦。

        结论

        根据实际业务需求,从基本原理、实践和与源码浅析几个方面介绍了限流及简单的实现,并最终使用Semaphore信号量实现了访问的限流。在生产环境上,可以发现优化后比优化前的执行效率有成倍的增长。

        参考资料
        [1] https://cloud.google.com/architecture/rate-limiting-strategies-techniques
        [2] https://blog.csdn.net/zrg523/article/details/82185088
        [3] https://blog.csdn.net/CrankZ/article/details/115152054
        [4] https://support.huawei.com/enterprise/en/doc/EDOC1100055046/33f24bb0/token-bucket
        [5] https://webeduclick.com/leaky-bucket-algorithm-in-computer-networks/ 
        [6] https://blog.csdn.net/CrankZ/article/details/115152054
        [7] https://pdai.tech/md/arch/arch-y-ratelimit.html
        [8] https://juejin.cn/post/6844903600418717704

        [9] https://www.cnblogs.com/leesf456/p/5414778.html


        关于领创集团
        (Advance Intelligence Group)
        领创集团成立于2016年,致力于通过科技创新的本地化应用,改造和重塑金融和零售行业,以多元化的业务布局打造一个服务于消费者、企业和商户的生态圈。集团旗下包含企业业务和消费者业务两大板块,企业业务包含ADVANCE.AI和Ginee,分别为银行、金融、金融科技、零售和电商行业客户提供基于AI技术的数字身份验证、风险管理产品和全渠道电商服务解决方案;消费者业务Atome Financial包括亚洲最大的先享后付平台Atome和数字信贷服务。2021年9月,领创集团宣布完成超4亿美元D轮融资,融资完成后领创集团估值已超20亿美元,成为新加坡最大的独立科技创业公司之一。
        往期回顾
        BREAK AWAY





        技术创想 | 用户中心 AWS WAF & CloudFront加速实践

        技术创想 | Atome 实用主义性能优化盘点-缓存篇


           一个空格引发的DuplicatekeyException


           

        文章转载自领创集团Advance Group,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

        评论