# 定时轮
[TOC]
## Hierarchical Timing Wheels
**Hierarchical Timing Wheels** 是一种简单的定时轮算法,它是一个由定时任务桶组成的循环列表。设时间单位为 u。一个大小为 n 的定时轮有 n 个桶,并且可以容纳在 n * u 的时间间隔内的定时任务。每个桶保存落入相应时间范围内的定时任务。在开始时,第一个桶保存 [0, u) 时间段的任务,第二个桶保存 [u, 2u) 时间段的任务,依此类推,第 n 个桶保存 [u * (n - 1), u * n) 时间段的任务。每经过一个时间单位 u,定时器就会跳转到下一个桶并将其中所有的定时任务过期。因此,定时器不会将任务插入到当前时间的桶中,因为它们已经过期。定时器会立即执行已过期的任务。然后,已清空的桶可用于下一轮,因此如果当前桶的时间为 t,则经过一个 tick 后,它将变为时间范围为 [t + u * n, t + (n + 1) * u) 的桶。定时轮的插入/删除(启动定时器/停止定时器)成本为 O(1),而基于优先级队列的定时器,如 java.util.concurrent.DelayQueue 和 java.util.Timer,则具有 O(log n) 的插入/删除成本。
简单定时轮的一个主要缺点是它假设定时器请求在当前时间的 n * u 时间间隔内。如果定时器请求超出了此间隔,则发生溢出。**Hierarchical Timing Wheels** 用于处理此类溢出。它是一种按层次组织的定时轮。最低层具有最精细的时间分辨率。随着层次的提升,时间分辨率变得更粗糙。如果某一级别的时间分辨率为 u,大小为 n,则下一级别的时间分辨率应为 n * u。在每个级别,溢出被委托给较高级别的定时轮。当较高级别的定时轮进行滴答时,它会重新插入定时任务到较低级别的轮中。可以按需创建溢出轮。当溢出桶中的任务过期时,所有任务都会递归地重新插入到定时器中。然后,这些任务会移到更细粒度的轮中或执行。插入(启动定时器)成本为 O(m),其中 m 是轮的数量,通常与系统中的请求数量相比非常小,而删除(停止定时器)成本仍为 O(1)。
例如,假设 u 为 1,n 为 3。如果开始时间为 c,则不同级别的桶如下所示:
```
级别 桶
1 [c,c] [c+1,c+1] [c+2,c+2]
2 [c,c+2] [c+3,c+5] [c+6,c+8]
3 [c,c+8] [c+9,c+17] [c+18,c+26]
```
桶的过期时间是桶的开始时间。因此,在时间 c+1,桶 [c,c]、[c,c+2] 和 [c,c+8] 过期。级别 1 的时钟移到 c+1,并创建 [c+3,c+3]。级别 2 和级别 3 的时钟保持在 c,因为它们的时钟分别以 3 和 9 为单位移动。因此,在级别 2 和 3 中不会创建新的桶。
请注意,级别 2 中的桶 [c,c+2] 不会收到任何任务,因为该范围已在级别 1 中覆盖。对于级别 3 中的桶 [c,c+8] 也是如此,因为它的范围在级别 2 中已经覆盖。这可能有点浪费,但简化了实现。
```
1 [c+1,c+1] [c+2,c+2] [c+3,c+3]
2 [c,c+2] [c+3,c+5] [c+6,c+8]
3 [c,c+8] [c+9,c+17] [c+18,c+26]
```
在时间 c+2,[c+1,c+1] 是新过期的。级别 1 移动到 c+2,并创建 [c+4,c+4]。
```
1 [c+2,c+2] [c+3,c+3] [c+4,c+4]
2 [c,c+2] [c+3,c+5] [c+6,c+8]
3 [c,c+8] [c+9,c+17] [c+18,c+26]
```
在时间 c+3,[c+2,c+2] 是新过期的。级别 2 移动到 c+3,并创建 [c+5,c+5] 和 [c+9,c+11]。级别 3 保持在 c。
```
1 [c+3,c+3] [c+4,c+4] [c+5,c+5]
2 [c+3,c+5] [c+6,c+8] [c+9,c+11]
3 [c,c+8] [c+9,c+17] [c+18,c+26]
```
当操作在超时之前完成时,层次定时轮的效果特别好。即使所有任务都超时,当定时器中有很多项时,它仍然具有优势。它的插入成本(包括重新插入)和删除成本分别为 O(m) m是轮数和 O(1),而基于优先级队列的定时器在插入和删除上
## TimingWheel
```plantuml
@startuml
class TimingWheel {
- tickMs: long
- startMs: long
- wheelSize: int
- taskCounter: AtomicInteger
- queue: DelayQueue<TimerTaskList>
- interval: long
- buckets: TimerTaskList[]
- currentTimeMs: long
- overflowWheel: TimingWheel
+ TimingWheel(tickMs: long, wheelSize: int, startMs: long, taskCounter: AtomicInteger, queue: DelayQueue<TimerTaskList>)
}
class TimerTaskList {
- expiration: long
- tasks: Queue<TimerTask>
+ TimerTaskList(expiration: long)
+ add(task: TimerTask): boolean
+ remove(task: TimerTask): boolean
+ flush(taskExecutor: Executor): void
}
class DelayQueue<E> {
}
class TimerTask {
+ final long delayMs;
- volatile TimerTaskEntry timerTaskEntry
+ TimerTask(long delayMs)
}
TimingWheel "1" --> "*" TimerTaskList : contains
TimingWheel "1" --> "1" TimingWheel : overflowWheel
TimingWheel "1" --> "1" DelayQueue : queue
TimingWheel "1" --> "*" TimerTask : addTask
TimerTaskList "1" --> "*" TimerTask : tasks
@enduml
```
### 构造方法
```java
TimingWheel(
long tickMs,
int wheelSize,
long startMs,
AtomicInteger taskCounter,
DelayQueue<TimerTaskList> queue
) {
this.tickMs = tickMs;
this.startMs = startMs;
this.wheelSize = wheelSize;
this.taskCounter = taskCounter;
this.queue = queue;
this.buckets = new TimerTaskList[wheelSize];
this.interval = tickMs * wheelSize;
// rounding down to multiple of tickMs
this.currentTimeMs = startMs - (startMs % tickMs);
for (int i = 0; i < buckets.length; i++) {
buckets[i] = new TimerTaskList(taskCounter);
}
}
```
`TimingWheel`维护起始时间,当前时间,整个时间轮间隔和单位时间。到期时将TimerTaskList放入`DelayQueue`,保持`DelayQueue`尽量小。用`TimerTaskList[] buckets`维护未到期的TimerTaskList,插入都是O(1)。
### add方法
```java
public boolean add(TimerTaskEntry timerTaskEntry) {
long expiration = timerTaskEntry.expirationMs;
if (timerTaskEntry.cancelled()) {
// Cancelled
return false;
} else if (expiration < currentTimeMs + tickMs) {
// Already expired
return false;
} else if (expiration < currentTimeMs + interval) {
// Put in its own bucket
long virtualId = expiration / tickMs;
int bucketId = (int) (virtualId % (long) wheelSize);
TimerTaskList bucket = buckets[bucketId];
bucket.add(timerTaskEntry);
// Set the bucket expiration time
if (bucket.setExpiration(virtualId * tickMs)) {
// The bucket needs to be enqueued because it was an expired bucket
// We only need to enqueue the bucket when its expiration time has changed, i.e. the wheel has advanced
// and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle
// will pass in the same value and hence return false, thus the bucket with the same expiration will not
// be enqueued multiple times.
queue.offer(bucket);
}
return true;
} else {
// Out of the interval. Put it into the parent timer
if (overflowWheel == null) addOverflowWheel();
return overflowWheel.add(timerTaskEntry);
}
}
private synchronized void addOverflowWheel() {
if (overflowWheel == null) {
overflowWheel = new TimingWheel(
interval,
wheelSize,
currentTimeMs,
taskCounter,
queue
);
}
}
```
1. `TimerTask`如果已经取消或者过去,就不插入add返回false
2. 如果未过期并且处于当前时间轮的范围内。那么就计算出这个Task过期所处的桶,加入到这个桶中。设置桶的过期值,如果旧的值与新的值不一样说明这个桶过期了,需要加入队列。只有当过期时间发生变化时,才需要将桶重新添加到队列中。换句话说,如果桶的过期时间没有变化,不需要重复加入队列。在同一个轮周期内进一步调用设置相同的过期时间将传递相同的值,因此会返回false。因此,具有相同过期时间的桶不会被多次加入队列。每个桶第一次插入任务都会加入到队列,初始化expiration都是-1,第一次比较肯定不一样。
3. 如果任务过期时间超过当前轮访问,那么使用下层大范围时间轮
### 时间推进
```java
public void advanceClock(long timeMs) {
if (timeMs >= currentTimeMs + tickMs) {
currentTimeMs = timeMs - (timeMs % tickMs);
// Try to advance the clock of the overflow wheel if present
if (overflowWheel != null) overflowWheel.advanceClock(currentTimeMs);
}
}
```
通过调用`TimingWheel`的`advanceClock`方法来推进时间轮维护的时间。
## 结合SystemTimer
### SystemTimer的构造函数
```java
public SystemTimer(
String executorName,
long tickMs,
int wheelSize,
long startMs
) {
this.taskExecutor = Executors.newFixedThreadPool(1,
runnable -> KafkaThread.nonDaemon("executor-" + executorName, runnable));
this.delayQueue = new DelayQueue<>();
this.taskCounter = new AtomicInteger(0);
this.timingWheel = new TimingWheel(
tickMs,
wheelSize,
startMs,
taskCounter,
delayQueue
);
}
```
`TimingWheel`使用的延迟队列是SystemTimer传入的,共同的一个对象。SystemTimer也维护起始时间和执行任务的线程池。
### SystemTimer的add方法
```java
public void add(TimerTask timerTask) {
readLock.lock();
try {
addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + Time.SYSTEM.hiResClockMs()));
} finally {
readLock.unlock();
}
}
private void addTimerTaskEntry(TimerTaskEntry timerTaskEntry) {
if (!timingWheel.add(timerTaskEntry)) {
// Already expired or cancelled
if (!timerTaskEntry.cancelled()) {
taskExecutor.submit(timerTaskEntry.timerTask);
}
}
}
```
`SystemTimer`使用读写锁来保证线程安全。通过将task加入`TimingWheel`是否返回true或false可以判断任务是否到期可以执行,到期就放入线程池执行。
### SystemTimer推进时间轮
```java
public boolean advanceClock(long timeoutMs) throws InterruptedException {
TimerTaskList bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS);
if (bucket != null) {
writeLock.lock();
try {
while (bucket != null) {
timingWheel.advanceClock(bucket.getExpiration());
bucket.flush(this::addTimerTaskEntry);
bucket = delayQueue.poll();
}
} finally {
writeLock.unlock();
}
return true;
} else {
return false;
}
}
```
通过循环调用SystemTimer的`advanceClock`方法可以不断推进时间轮的执行,比如:
```java
while (timer.advanceClock(2000)) { }
```
SystemTimer的`advanceClock`方法会从延迟队列中取出到期的桶(如果有,没有等待超时时间)。再将桶中的每个TimerTask重新调用时间轮的add方法,如果TimerTask还没过期会重新回到时间轮合适的桶中,如果过期了那么就放入执行线程池进行执行。




