
时间轮算法解决的问题
但是这么做有一个问题,比如我们有100任务,那么定时任务队列就每次要扫100任务来确定任务是否要执行,这样会造成大量的空轮询。


时间轮算法
时间轮算法中轮询不在遍历所有任务,而是遍历时间刻度,当发现某一刻度上有任务(队列不为空)时,就会执行当前刻度上的所有任务。
使用场景
延迟队列的使用场景,时间轮都可以使用,比如:
心跳检测(维护TCP连接, 30秒没有访问就将连接断开) 会话、请求超时 消息延迟推送 业务超时订单(电商创建订单后锁定产品,30分钟内支付)
时间刻度问题
如果按1秒为一个时间刻度,那么一天会有86400个刻度,当我添加两个任务,一个是10秒后执行,另一个是86000秒后执行,那么其中大部分轮询都是空轮询,而且会浪费内存空间(每个刻度都有自己的任务队列)
基于round的时间轮算法
现在我们不增加刻度,而是通过添加round属性来描述任务。假设我们时间大小设置为20,我们添加三个任务:
10秒后执行的任务
30秒后执行的任务 50秒后执行的任务

那么这三个任务都在10刻度的任务队列中,分别为round=0, round=1, round=2。记 n 秒后执行任务, m为时间刻度,round计算方式为 n m, 任务位于 n % m 的任务队列中。
时间轮每移动一个刻度时,遍历任务队列取出round=0的任务执行,然后将其余的任务 round-1。
基于round的时间轮虽然解决了浪费内存空间的问题,但当时间刻度小,任务队列长的时候会增加耗时。
分层时间轮算法
与基于round的时间轮不同,分层时间轮采用层级联动的方式,具有以下特点:
不做遍历计算round,每一个刻度中的任务都是应该执行的。
当任务执行时间超过当前刻度范围时,进入下一层时间轮的范围。 定时任务通过升级和降级来转移队列中的位置。

时间轮升级
时间轮通过升级和降级来联动层级,执行时间超过当前层级最大刻度时,就会进行升级,进入下一层时间轮。
我们先创建三个任务,时间最大刻度设置为10,每个刻度间隔为1秒。
2秒后执行的任务
12秒后执行的任务 13秒后秒执行的任务
第一个任务会添加到第一层时间轮的 2 队列中去(每个刻度都有自己的队列)而第二,第三个任务因为超过10秒,就会升级到的第二层的 1 队列中,下面是代码。
type TimingWheel struct {ticker *time.TickertickMs int // 时间刻度wheelSize int // 时间轮大小currentPosition int // 轮盘上的指针位置level int // 时间轮层数prevWheel *TimingWheel // 上层时间轮nextWheel *TimingWheel // 下层时间轮slots []*Slot // 队列stopChannel chan bool}func (e *TimingWheel) addTask(duration int, task *Task) {// 如果添加的刻度大于当前的时间轮刻度, 进入下一轮if duration > e.wheelSize*e.tickMs {if e.nextWheel == nil {newTw := newTimingWheel(e.tickMs*e.wheelSize,e.wheelSize,e.level+1,e,)e.nextWheel = newTw}e.nextWheel.addTask(duration-e.wheelSize*e.tickMs, task)} else {e.moveTask(duration, task)}}func (e *TimingWheel) AddTask(duration int, job func()) {expiration := GetNowTimestamp() + int64(duration)e.addTask(duration, &Task{expiration, job})}
时间轮降级
时间轮走完一轮之后会判断是否有下一层时间轮,如果有的话就会通过降级把下一层的时间轮中的任务转移到上层。
func (e *TimingWheel) advance() {fmt.Println("advance", e.currentPosition, e.level)slot := e.slots[e.currentPosition]if e.level == 0 {// 表示第一层时间轮// 当前刻度队列中的任务全部取出并执行slot.trigger()} else {// 表示进入到下一层// 将队列中的定时任务取出来tasks := slot.getClear()// 当前时间戳currentTime := GetNowTimestamp()// 将任务转移到上一层队列中for t := tasks.Front(); t != nil; t = t.Next() {t := t.Value.(*Task)e.prevWheel.moveTask(int(t.expiration-currentTime), t)}}// 往前走一步e.currentPosition = (e.currentPosition + 1) % e.wheelSizeif e.currentPosition == 0 {// 当下一层存在时if e.nextWheel != nil {e.nextWheel.advance()}}}// 时间轮添加到队列中的任务func (e *TimingWheel) moveTask(duration int, task *Task) {if duration <= 0 {e.slots[e.currentPosition].add(task)return}// 选择刻度所在的队列位置,并添加任务到队列中index := (int(duration/e.tickMs) + e.currentPosition) % e.wheelSizee.slots[index].add(task)}
上面的例子中,当第一层时间轮走完,也就是过了10秒后,第二,第三个任务只剩下2秒和3秒的等待时间,我们就会把这些任务重新放到第一层时间轮的队列中完成降级。
原本第二,第三个任务在第二层时间轮中在刻度10-19这个刻度的队列中,通过降级把它们分别放到第一层的2、3队列中。
总结
两层时间轮用20个队列就可以处理0-109秒内的定时任务,三层就可以处理0-1009 (具体跟设置的刻度时间和长度有关)。 解决了轮询队列中的round,只要过期时间到了,队列中的任务都是需要处理的。 处理队列任务一般都会额外开启线程,时间轮算法中线程数量只跟层级有关,不会因为队列的增多而改变,减少了资源开销。 大家在处理定时任务时,都会使用什么方法呢?
欢迎在评论区分享你的观点。

感谢阅读「技术创想」第47期文章
领创集团正在春季招聘中
期待你的加入
点击文末
“阅读原文”
获取更多
招聘信息

关于领创集团













