暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

时间轮算法

领创集团Advance Group 2022-05-12
1359


时间轮算法解决的问题

任务队列一般都会采用链表来实现,队列中的任务都包含任务执行的时间戳信息,我们通过不断的轮询来判断任务是否需要执行

但是这么做有一个问题,比如我们有100任务,那么定时任务队列就每次要扫100任务来确定任务是否要执行,这样会造成大量的空轮询。


时间轮算法

时间轮算法中轮询不在遍历所有任务,而是遍历时间刻度,当发现某一刻度上有任务(队列不为空)时,就会执行当前刻度上的所有任务。



使用场景

延迟队列的使用场景,时间轮都可以使用,比如:

  • 心跳检测(维护TCP连接, 30秒没有访问就将连接断开)
  • 会话、请求超时
  • 消息延迟推送
  • 业务超时订单(电商创建订单后锁定产品,30分钟内支付)


时间刻度问题

如果按1秒为一个时间刻度,那么一天会有86400个刻度,当我添加两个任务,一个是10秒后执行,另一个是86000秒后执行,那么其中大部分轮询都是空轮询,而且会浪费内存空间(每个刻度都有自己的任务队列)


基于round的时间轮算法

现在我们不增加刻度,而是通过添加round属性来描述任务。假设我们时间大小设置为20,我们添加三个任务:

  • 10秒后执行的任务
  • 30秒后执行的任务
  • 50秒后执行的任务


    那么这三个任务都在10刻度的任务队列中,分别为round=0, round=1, round=2。记 n 秒后执行任务, m为时间刻度,round计算方式为 n m, 任务位于 n % m 的任务队列中。

    时间轮每移动一个刻度时,遍历任务队列取出round=0的任务执行,然后将其余的任务 round-1。

    基于round的时间轮虽然解决了浪费内存空间的问题,但当时间刻度小,任务队列长的时候会增加耗时。


    分层时间轮算法

    与基于round的时间轮不同,分层时间轮采用层级联动的方式,具有以下特点:

    • 不做遍历计算round,每一个刻度中的任务都是应该执行的。
    • 当任务执行时间超过当前刻度范围时,进入下一层时间轮的范围。
    • 定时任务通过升级和降级来转移队列中的位置。


    时间轮升级

    时间轮通过升级和降级来联动层级,执行时间超过当前层级最大刻度时,就会进行升级,进入下一层时间轮。

    我们先创建三个任务,时间最大刻度设置为10,每个刻度间隔为1秒。

    • 2秒后执行的任务
    • 12秒后执行的任务
    • 13秒后秒执行的任务

      第一个任务会添加到第一层时间轮的 2 队列中去(每个刻度都有自己的队列)而第二,第三个任务因为超过10秒,就会升级到的第二层的 1 队列中,下面是代码。

        type TimingWheel struct {
        ticker *time.Ticker
        tickMs int // 时间刻度
        wheelSize int // 时间轮大小
        currentPosition int // 轮盘上的指针位置
        level int // 时间轮层数
        prevWheel *TimingWheel // 上层时间轮
        nextWheel *TimingWheel // 下层时间轮
        slots []*Slot // 队列
        stopChannel chan bool
        }


        func (e *TimingWheel) addTask(duration int, task *Task) {


        // 如果添加的刻度大于当前的时间轮刻度, 进入下一轮
        if duration > e.wheelSize*e.tickMs {
        if e.nextWheel == nil {
        newTw := newTimingWheel(
        e.tickMs*e.wheelSize,
        e.wheelSize,
        e.level+1,
        e,
        )
        e.nextWheel = newTw
        }
        e.nextWheel.addTask(duration-e.wheelSize*e.tickMs, task)
        } else {
        e.moveTask(duration, task)
        }
        }




        func (e *TimingWheel) AddTask(duration int, job func()) {
        expiration := GetNowTimestamp() + int64(duration)
        e.addTask(duration, &Task{expiration, job})
        }



        时间轮降级

        时间轮走完一轮之后会判断是否有下一层时间轮,如果有的话就会通过降级把下一层的时间轮中的任务转移到上层。

          func (e *TimingWheel) advance() {
          fmt.Println("advance", e.currentPosition, e.level)
          slot := e.slots[e.currentPosition]


          if e.level == 0 {
          // 表示第一层时间轮


          // 当前刻度队列中的任务全部取出并执行
          slot.trigger()
          } else {
          // 表示进入到下一层


          // 将队列中的定时任务取出来
          tasks := slot.getClear()


          // 当前时间戳
          currentTime := GetNowTimestamp()


          // 将任务转移到上一层队列中
          for t := tasks.Front(); t != nil; t = t.Next() {
          t := t.Value.(*Task)
          e.prevWheel.moveTask(int(t.expiration-currentTime), t)
          }
          }


          // 往前走一步
          e.currentPosition = (e.currentPosition + 1) % e.wheelSize


          if e.currentPosition == 0 {
          // 当下一层存在时
          if e.nextWheel != nil {
          e.nextWheel.advance()
          }
          }
          }


          // 时间轮添加到队列中的任务
          func (e *TimingWheel) moveTask(duration int, task *Task) {
          if duration <= 0 {
          e.slots[e.currentPosition].add(task)
          return
          }
          // 选择刻度所在的队列位置,并添加任务到队列中
          index := (int(duration/e.tickMs) + e.currentPosition) % e.wheelSize
          e.slots[index].add(task)
          }

          上面的例子中,当第一层时间轮走完,也就是过了10秒后,第二,第三个任务只剩下2秒和3秒的等待时间,我们就会把这些任务重新放到第一层时间轮的队列中完成降级。

          原本第二,第三个任务在第二层时间轮中在刻度10-19这个刻度的队列中,通过降级把它们分别放到第一层的2、3队列中。


          总结

          • 两层时间轮用20个队列就可以处理0-109秒内的定时任务,三层就可以处理0-1009 (具体跟设置的刻度时间和长度有关)。
          • 解决了轮询队列中的round,只要过期时间到了,队列中的任务都是需要处理的。
          • 处理队列任务一般都会额外开启线程,时间轮算法中线程数量只跟层级有关,不会因为队列的增多而改变,减少了资源开销。

            大家在处理定时任务时,都会使用什么方法呢?

            欢迎在评论区分享你的观点。




          感谢阅读「技术创想」第47期文章

          领创集团正在春季招聘中

          期待你的加入

          点击文末

          阅读原文

          获取更多

          招聘信息


          关于领创集团

          (Advance Intelligence Group)
          领创集团成立于 2016年,致力于通过科技创新的本地化应用,改造和重塑金融和零售行业,以多元化的业务布局打造一个服务于消费者、企业和商户的生态圈。集团旗下包含企业业务和消费者业务两大板块,企业业务包含 ADVANCE.AI 和 Ginee,分别为银行、金融、金融科技、零售和电商行业客户提供基于 AI 技术的数字身份验证、风险管理产品和全渠道电商服务解决方案;消费者业务 Atome Financial 包括亚洲领先的先享后付平台 Atome 和数字金融服务。2021年 9月,领创集团宣布完成超4亿美元 D 轮融资,融资完成后领创集团估值已超 20亿美元,成为新加坡最大的独立科技创业公司之一。
          文章转载自领创集团Advance Group,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

          评论