暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

kafka定时轮设计

RiverLee 2024-06-06
128

# 定时轮


[TOC]


## Hierarchical Timing Wheels


**Hierarchical Timing Wheels** 是一种简单的定时轮算法,它是一个由定时任务桶组成的循环列表。设时间单位为 u。一个大小为 n 的定时轮有 n 个桶,并且可以容纳在 n * u 的时间间隔内的定时任务。每个桶保存落入相应时间范围内的定时任务。在开始时,第一个桶保存 [0, u) 时间段的任务,第二个桶保存 [u, 2u) 时间段的任务,依此类推,第 n 个桶保存 [u * (n - 1), u * n) 时间段的任务。每经过一个时间单位 u,定时器就会跳转到下一个桶并将其中所有的定时任务过期。因此,定时器不会将任务插入到当前时间的桶中,因为它们已经过期。定时器会立即执行已过期的任务。然后,已清空的桶可用于下一轮,因此如果当前桶的时间为 t,则经过一个 tick 后,它将变为时间范围为 [t + u * n, t + (n + 1) * u) 的桶。定时轮的插入/删除(启动定时器/停止定时器)成本为 O(1),而基于优先级队列的定时器,如 java.util.concurrent.DelayQueue 和 java.util.Timer,则具有 O(log n) 的插入/删除成本。


简单定时轮的一个主要缺点是它假设定时器请求在当前时间的 n * u 时间间隔内。如果定时器请求超出了此间隔,则发生溢出。**Hierarchical Timing Wheels** 用于处理此类溢出。它是一种按层次组织的定时轮。最低层具有最精细的时间分辨率。随着层次的提升,时间分辨率变得更粗糙。如果某一级别的时间分辨率为 u,大小为 n,则下一级别的时间分辨率应为 n * u。在每个级别,溢出被委托给较高级别的定时轮。当较高级别的定时轮进行滴答时,它会重新插入定时任务到较低级别的轮中。可以按需创建溢出轮。当溢出桶中的任务过期时,所有任务都会递归地重新插入到定时器中。然后,这些任务会移到更细粒度的轮中或执行。插入(启动定时器)成本为 O(m),其中 m 是轮的数量,通常与系统中的请求数量相比非常小,而删除(停止定时器)成本仍为 O(1)。


例如,假设 u 为 1,n 为 3。如果开始时间为 c,则不同级别的桶如下所示:


```

级别 桶

1 [c,c] [c+1,c+1] [c+2,c+2]

2 [c,c+2] [c+3,c+5] [c+6,c+8]

3 [c,c+8] [c+9,c+17] [c+18,c+26]

```


桶的过期时间是桶的开始时间。因此,在时间 c+1,桶 [c,c]、[c,c+2] 和 [c,c+8] 过期。级别 1 的时钟移到 c+1,并创建 [c+3,c+3]。级别 2 和级别 3 的时钟保持在 c,因为它们的时钟分别以 3 和 9 为单位移动。因此,在级别 2 和 3 中不会创建新的桶。


请注意,级别 2 中的桶 [c,c+2] 不会收到任何任务,因为该范围已在级别 1 中覆盖。对于级别 3 中的桶 [c,c+8] 也是如此,因为它的范围在级别 2 中已经覆盖。这可能有点浪费,但简化了实现。


```

1 [c+1,c+1] [c+2,c+2] [c+3,c+3]

2 [c,c+2] [c+3,c+5] [c+6,c+8]

3 [c,c+8] [c+9,c+17] [c+18,c+26]

```


在时间 c+2,[c+1,c+1] 是新过期的。级别 1 移动到 c+2,并创建 [c+4,c+4]。


```

1 [c+2,c+2] [c+3,c+3] [c+4,c+4]

2 [c,c+2] [c+3,c+5] [c+6,c+8]

3 [c,c+8] [c+9,c+17] [c+18,c+26]

```


在时间 c+3,[c+2,c+2] 是新过期的。级别 2 移动到 c+3,并创建 [c+5,c+5] 和 [c+9,c+11]。级别 3 保持在 c。


```

1 [c+3,c+3] [c+4,c+4] [c+5,c+5]

2 [c+3,c+5] [c+6,c+8] [c+9,c+11]

3 [c,c+8] [c+9,c+17] [c+18,c+26]

```


当操作在超时之前完成时,层次定时轮的效果特别好。即使所有任务都超时,当定时器中有很多项时,它仍然具有优势。它的插入成本(包括重新插入)和删除成本分别为 O(m) m是轮数和 O(1),而基于优先级队列的定时器在插入和删除上



## TimingWheel


```plantuml

@startuml

class TimingWheel {

- tickMs: long

- startMs: long

- wheelSize: int

- taskCounter: AtomicInteger

- queue: DelayQueue<TimerTaskList>

- interval: long

- buckets: TimerTaskList[]

- currentTimeMs: long

- overflowWheel: TimingWheel


+ TimingWheel(tickMs: long, wheelSize: int, startMs: long, taskCounter: AtomicInteger, queue: DelayQueue<TimerTaskList>)


}


class TimerTaskList {

- expiration: long

- tasks: Queue<TimerTask>


+ TimerTaskList(expiration: long)


+ add(task: TimerTask): boolean

+ remove(task: TimerTask): boolean

+ flush(taskExecutor: Executor): void

}



class DelayQueue<E> {


}



class TimerTask {

+ final long delayMs;

- volatile TimerTaskEntry timerTaskEntry

+ TimerTask(long delayMs)

}


TimingWheel "1" --> "*" TimerTaskList : contains

TimingWheel "1" --> "1" TimingWheel : overflowWheel

TimingWheel "1" --> "1" DelayQueue : queue

TimingWheel "1" --> "*" TimerTask : addTask

TimerTaskList "1" --> "*" TimerTask : tasks

@enduml

```


### 构造方法


```java

TimingWheel(

long tickMs,

int wheelSize,

long startMs,

AtomicInteger taskCounter,

DelayQueue<TimerTaskList> queue

) {

this.tickMs = tickMs;

this.startMs = startMs;

this.wheelSize = wheelSize;

this.taskCounter = taskCounter;

this.queue = queue;

this.buckets = new TimerTaskList[wheelSize];

this.interval = tickMs * wheelSize;

// rounding down to multiple of tickMs

this.currentTimeMs = startMs - (startMs % tickMs);


for (int i = 0; i < buckets.length; i++) {

buckets[i] = new TimerTaskList(taskCounter);

}

}

```


`TimingWheel`维护起始时间,当前时间,整个时间轮间隔和单位时间。到期时将TimerTaskList放入`DelayQueue`,保持`DelayQueue`尽量小。用`TimerTaskList[] buckets`维护未到期的TimerTaskList,插入都是O(1)。


### add方法


```java

public boolean add(TimerTaskEntry timerTaskEntry) {

long expiration = timerTaskEntry.expirationMs;


if (timerTaskEntry.cancelled()) {

// Cancelled

return false;

} else if (expiration < currentTimeMs + tickMs) {

// Already expired

return false;

} else if (expiration < currentTimeMs + interval) {

// Put in its own bucket

long virtualId = expiration / tickMs;

int bucketId = (int) (virtualId % (long) wheelSize);

TimerTaskList bucket = buckets[bucketId];

bucket.add(timerTaskEntry);


// Set the bucket expiration time

if (bucket.setExpiration(virtualId * tickMs)) {

// The bucket needs to be enqueued because it was an expired bucket

// We only need to enqueue the bucket when its expiration time has changed, i.e. the wheel has advanced

// and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle

// will pass in the same value and hence return false, thus the bucket with the same expiration will not

// be enqueued multiple times.

queue.offer(bucket);

}


return true;

} else {

// Out of the interval. Put it into the parent timer

if (overflowWheel == null) addOverflowWheel();

return overflowWheel.add(timerTaskEntry);

}

}


private synchronized void addOverflowWheel() {

if (overflowWheel == null) {

overflowWheel = new TimingWheel(

interval,

wheelSize,

currentTimeMs,

taskCounter,

queue

);

}

}

```


1. `TimerTask`如果已经取消或者过去,就不插入add返回false

2. 如果未过期并且处于当前时间轮的范围内。那么就计算出这个Task过期所处的桶,加入到这个桶中。设置桶的过期值,如果旧的值与新的值不一样说明这个桶过期了,需要加入队列。只有当过期时间发生变化时,才需要将桶重新添加到队列中。换句话说,如果桶的过期时间没有变化,不需要重复加入队列。在同一个轮周期内进一步调用设置相同的过期时间将传递相同的值,因此会返回false。因此,具有相同过期时间的桶不会被多次加入队列。每个桶第一次插入任务都会加入到队列,初始化expiration都是-1,第一次比较肯定不一样。

3. 如果任务过期时间超过当前轮访问,那么使用下层大范围时间轮


### 时间推进


```java

public void advanceClock(long timeMs) {

if (timeMs >= currentTimeMs + tickMs) {

currentTimeMs = timeMs - (timeMs % tickMs);


// Try to advance the clock of the overflow wheel if present

if (overflowWheel != null) overflowWheel.advanceClock(currentTimeMs);

}

}

```


通过调用`TimingWheel`的`advanceClock`方法来推进时间轮维护的时间。


## 结合SystemTimer


### SystemTimer的构造函数


```java

public SystemTimer(

String executorName,

long tickMs,

int wheelSize,

long startMs

) {

this.taskExecutor = Executors.newFixedThreadPool(1,

runnable -> KafkaThread.nonDaemon("executor-" + executorName, runnable));

this.delayQueue = new DelayQueue<>();

this.taskCounter = new AtomicInteger(0);

this.timingWheel = new TimingWheel(

tickMs,

wheelSize,

startMs,

taskCounter,

delayQueue

);

}

```


`TimingWheel`使用的延迟队列是SystemTimer传入的,共同的一个对象。SystemTimer也维护起始时间和执行任务的线程池。



### SystemTimer的add方法


```java

public void add(TimerTask timerTask) {

readLock.lock();

try {

addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + Time.SYSTEM.hiResClockMs()));

} finally {

readLock.unlock();

}

}


private void addTimerTaskEntry(TimerTaskEntry timerTaskEntry) {

if (!timingWheel.add(timerTaskEntry)) {

// Already expired or cancelled

if (!timerTaskEntry.cancelled()) {

taskExecutor.submit(timerTaskEntry.timerTask);

}

}

}

```


`SystemTimer`使用读写锁来保证线程安全。通过将task加入`TimingWheel`是否返回true或false可以判断任务是否到期可以执行,到期就放入线程池执行。



### SystemTimer推进时间轮


```java

public boolean advanceClock(long timeoutMs) throws InterruptedException {

TimerTaskList bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS);

if (bucket != null) {

writeLock.lock();

try {

while (bucket != null) {

timingWheel.advanceClock(bucket.getExpiration());

bucket.flush(this::addTimerTaskEntry);

bucket = delayQueue.poll();

}

} finally {

writeLock.unlock();

}

return true;

} else {

return false;

}

}

```


通过循环调用SystemTimer的`advanceClock`方法可以不断推进时间轮的执行,比如:


```java

while (timer.advanceClock(2000)) { }

```

SystemTimer的`advanceClock`方法会从延迟队列中取出到期的桶(如果有,没有等待超时时间)。再将桶中的每个TimerTask重新调用时间轮的add方法,如果TimerTask还没过期会重新回到时间轮合适的桶中,如果过期了那么就放入执行线程池进行执行。

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论