特别注意:定时操作和延时操作是两个不同的概念。定时操作是指在特定时间之后执行的操作,时间轮用来管理定时操作。而延时操作可以在超时时间之前的任意时间点执行。因此,延时操作需要能够支持外部事件的触发。
比如延时生产操作,它的外部事件是所要写入消息的某个分区的 HW 发生增长。也就是说,随着 follower 副本不断地与 leader 副本进行消息同步,进而促使HW进一步增长,HW 每增长一次都会检测是否能够完成此次延时生产操作。
Kafka 对延时生产请求的处理

1. Timer 和 SystemTimer
Timer 接口:
trait Timer {将给定的定时任务插入到时间轮上,等待后续延迟执行def add(timerTask: TimerTask): Unit向前推进时钟,执行已达过期时间的延迟任务def advanceClock(timeoutMs: Long): Boolean获取时间轮上总的定时任务数def size: Int关闭定时器def shutdown(): Unit}
Timer 接口定义了四个方法:
add:将给定的定时任务插入到时间轮
advanceClock:向前推进时间轮,执行已到达过期时间的延时任务
size:获取时间轮上总的定时任务数
shutdown:关闭定时器服务
SystemTimer 类:
该类是 Timer 接口的实现,是一个定时器,内部封装了时间轮对象,为 Purgatory 延时操作管理器提供延时请求的管理功能。保存了因为不满足条件而无法完成,但是又没有超时的请求。
@threadsafeclass SystemTimer(executorName: String,//Purgatory 的名字。Kafka 中存在不同的 PurgatorytickMs: Long = 1,//时间轮中时间格的跨度wheelSize: Int = 20,//时间轮上时间格的数量startMs: Long = Time.SYSTEM.hiResClockMs//该 SystemTimer 定时器启动时间,单位是毫秒。) extends Timer {// 单线程的线程池用于异步执行定时任务private[this] val taskExecutor = Executors.newFixedThreadPool(1, new ThreadFactory() {def newThread(runnable: Runnable): Thread =KafkaThread.nonDaemon("executor-"+executorName, runnable)})// 延迟队列保存所有Bucket,即所有TimerTaskList对象private[this] val delayQueue = new DelayQueue[TimerTaskList]()// 总定时任务数private[this] val taskCounter = new AtomicInteger(0)//时间轮对象private[this] val timingWheel = new TimingWheel(tickMs = tickMs,wheelSize = wheelSize,startMs = startMs,taskCounter = taskCounter,delayQueue)// 维护线程安全的读写锁private[this] val readWriteLock = new ReentrantReadWriteLock()private[this] val readLock = readWriteLock.readLock()private[this] val writeLock = readWriteLock.writeLock()...}
构造方法中的 tickMs 和 wheelSize 参数之前已经分析过,分别代表了时间轮上每个时间格的跨度和时间格的数量,它们的乘积就是本层时间轮的跨度。executorName 表示 Purgatory 的名称,Kafka 中包含了多种 Purgatory。startMs 是该定时器启动时的时间戳,单位 ms。
除此之外,SystemTimer 中还定义了几个关键的变量,含义分别如下:
delayQueue:保存了当前定时器下管理的所有时间格 bucket(TimerTaskList 类型),是一个 DelayQueue 对象,所以只有在 bucket 过期后,才能从该队列中获取到。
timingWheel:时间轮对象。SystemTimer 通过该变量来操作时间轮
taskExecutor:单线程的线程池,用于在定时任务到期后异步执行任务逻辑
① add:将给定的定时任务插入时间轮,内部调用了 addTimerTaskEntry 方法
def add(timerTask: TimerTask): Unit = {// 获取读锁。在没有线程持有写锁的前提下,// 多个线程能够同时向时间轮添加定时任务readLock.lock()try {// 调用addTimerTaskEntry执行插入逻辑addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + Time.SYSTEM.hiResClockMs))} finally {//释放读锁readLock.unlock()}}
② addTimerTaskEntry:
private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {if (!timingWheel.add(timerTaskEntry)) {// 定时任务未取消,说明定时任务已过期,就需要添加到执行任务的线程池等待执行// 否则timingWheel.add方法应该返回Trueif (!timerTaskEntry.cancelled)taskExecutor.submit(timerTaskEntry.timerTask)}}
该方法内调用了时间轮 TimingWheel.add 方法:
如果返回 true,说明插入定时任务成功,if 条件不满足,方法结束;
如果返回 false,会继续判断任务被取消了还是任务过期了
如果是取消了,则什么都不做;
如果是任务过期了,则将该任务提交到线程池等待执行任务逻辑
③ advanceClock:向前驱动时钟,说白了就是将时间轮的 currentTime 指针向前嘀嗒指定的时间格
def advanceClock(timeoutMs: Long): Boolean = {//获取delayQueue中下一个已过期的Bucket,只有已经过期的任务才会执行,否则还是保存在时间轮中var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)if (bucket != null) {//获取写锁// 一旦有线程持有写锁,其他任何线程执行add或advanceClock方法时会阻塞writeLock.lock()try {while (bucket != null) {// 推动时间轮向前"滚动"到Bucket的过期时间点timingWheel.advanceClock(bucket.getExpiration())// 将该Bucket下的所有定时任务重写回到时间轮,这是因为部分任务会进行降级bucket.flush(reinsert)// 读取下一个Bucket对象bucket = delayQueue.poll()}} finally {// 释放写锁writeLock.unlock()}true} else {false}}
该方法的逻辑是:遍历 delayQueue 中的所有 Bucket,并将时间轮的时钟依次推进到它们的过期时间点,令它们过期。然后,再将这些 Bucket 下的所有定时任务全部重新插入回时间轮。插入回时间轮的时候,就会涉及到时间轮的降级。(关于时间轮的降级,可以看上一篇)
2. DelayedOperation
abstract class DelayedOperation(override val delayMs: Long,//超时时间,一般是客户端请求的超时时间,即request.timeout.mslockOpt: Option[Lock] = None) extends TimerTask with Logging {// 标识该延迟操作是否已经完成private val completed = new AtomicBoolean(false)// 防止多个线程同时检查操作是否可完成时发生锁竞争导致操作最终超时private val tryCompletePending = new AtomicBoolean(false)private[server] val lock: Lock = lockOpt.getOrElse(new ReentrantLock)...}
可以看到,DelayedOperation 类是一个抽象类,构造方法中的 delayMs 表示超时时间,一般是客户端 request.timeout.ms 参数的值。该类定义了两个关键的变量:
completed:标识该延迟操作是否已经完成,默认为 false
tryCompletePending:标识拿到锁的线程是否有机会再次检查条件是否已经满足,防止多个线程同时检查操作是否可完成时发生锁竞争导致操作最终超时。
这个参数是在 1.1 版本引入的。在此之前,只有 completed 参数。但是,这样就可能存在这样一个问题:当多个线程同时检查某个延迟操作是否满足完成条件时,如果其中一个线程持有了锁(也就是上面的 lock 字段),然后执行条件检查,会发现不满足完成条件。而与此同时,另一个线程执行检查时却发现条件满足了,但是这个线程又没有拿到锁,此时,该延迟操作将永远不会有再次被检查的机会,会导致最终超时。
forceComplete:强制完成延迟操作,不管它是否满足完成条件。每当操作满足完成条件或已经过期了,就需要调用该方法完成该操作。 isCompleted:检查延迟操作是否已经完成。源码使用这个方法来决定后续如何处理该操作。比如如果操作已经完成了,那么通常需要取消该操作。 onExpiration:强制完成之后执行的过期逻辑回调方法。只有真正完成操作的那个线程才有资格调用这个方法。 onComplete:完成延迟操作所需的处理逻辑。这个方法只会在 forceComplete 方法中被调用。 tryComplete:尝试完成延迟操作的顶层方法,内部会调用 forceComplete 方法。 maybeTryComplete:线程安全版本的 tryComplete 方法。现在外部代码调用的都是这个方法了。 run:调用延迟操作超时后的过期逻辑,也就是组合调用 forceComplete + onExpiration。
private[server] def maybeTryComplete(): Boolean = {//标记是否需要重试var retry = false//标记延时操作是否已经完成var done = falsedo {//尝试获取锁对象if (lock.tryLock()) {try {tryCompletePending.set(false)//尝试完成延迟操作done = tryComplete()} finally {lock.unlock()}// 运行到这里的线程持有锁,其他线程只能运行else分支的代码// 如果其他线程将maybeTryComplete设置为true,那么retry=true// 这就相当于其他线程给了本线程重试的机会retry = tryCompletePending.get()} else {// 运行到这里的线程没有拿到锁// 设置tryCompletePending=true给持有锁的线程一个重试的机会retry = !tryCompletePending.getAndSet(true)}} while (!isCompleted && retry)done}
如果拿到了锁,则清空 tryCompletePending 状态,尝试完成延时请求,最后释放锁并读取最新的 retry 状态
如果没有拿到锁,则设置 tryCompletePending 状态,来间接影响 retry 值,从而给获取到锁的线程一个重试的机会。重试通过 do...while 循环实现
3. DelayedOperationPurgatory
final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,//Purgatory名称timeoutTimer: Timer,//定时器,里面封装了分层时间轮brokerId: Int = 0,//broker编号purgeInterval: Int = 1000,//控制删除线程移除 Bucket 中的过期延迟请求的频率,默认为1秒reaperEnabled: Boolean = true,//是否启动删除线程,默认为truetimerEnabled: Boolean = true)//是否启用分层时间轮,默认为trueextends Logging with KafkaMetricsGroup {private val watcherLists = Array.fill[WatcherList](DelayedOperationPurgatory.Shards)(new WatcherList)//管理器中预估的总的延时请求数private[this] val estimatedTotalOperations = new AtomicInteger(0)//过期操作收割机线程private val expirationReaper = new ExpiredOperationReaper()...}
purgatoryName:延时操作管理器的名称
timeoutTimer:定时器对象
brokerId:Broker 的编号
purgeInterval:用于控制收割机线程移除 Bucket 中的过期延迟请求的频率,在绝大部分情况下,都是 1 秒一次。可以通过生产者参数进行配置:producer.purgatory.purge.interval.requests
reaperEnabled:是否开启收割机线程,默认开启
timerEnabled:是否采用分层时间轮,默认采用
watcherLists:
estimatedTotalOperations:管理器中总的延时请求数
expirationReaper :过期操作收割机线程对象,会定期调用 advanceClock 方法驱动时间轮向前推进
在 DelayedOperationPurgatory 类里面,定义了三个内部类:
WatcherList:
private class WatcherList {// 定义一组按照Key分组的Watchers对象,Key 可以是任何类型,而 Value 就是 Key 对应类型的一组 Watchers 对象。val watchersByKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key)))val watchersLock = new ReentrantLock()// 返回所有Watchers对象def allWatchers = {watchersByKey.values}}
private class Watchers(val key: Any) {//延迟请求的链表private[this] val operations = new ConcurrentLinkedQueue[T]()...}
Watchers 的本质是一个链表,它提供的方法主要就是操作链表的方法,比如tryCompleteWatched 方法会遍历整个链表,并尝试完成其中的延迟请求。
def tryCompleteWatched(): Int = {//完成的请求数var completed = 0val iter = operations.iterator()//遍历保存延时请求的链表while (iter.hasNext) {val curr = iter.next()if (curr.isCompleted) {//如果请求完成了就从链表中移除iter.remove()//如果尝试完成了请求,则将请求从链表中移除,并更新完成的请求数} else if (curr.maybeTryComplete()) {iter.remove()completed += 1}}if (operations.isEmpty)removeKeyIfEmpty(key, this)//返回完成的延时请求数completed}
ExpiredOperationReaper:
private class ExpiredOperationReaper extends ShutdownableThread("ExpirationReaper-%d-%s".format(brokerId, purgatoryName),false) {override def doWork() {advanceClock(200L)}}
DelayedOperationPurgatory 中的两个关键方法:
① tryCompleteElseWatch:检查操作是否能够完成,如果不能的话,就把它加入到对应 Key 所在的 WatcherList 中
def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {assert(watchKeys.nonEmpty, "The watch key list can't be empty")//尝试完成给定的操作var isCompletedByMe = operation.tryComplete()// 如果该延迟请求是由本线程完成的,直接返回true即可if (isCompletedByMe)return truevar watchCreated = false// 遍历所有要监控的Keyfor(key <- watchKeys) {// 再次查看请求的完成状态,如果已经完成,就说明是被其他线程完成的,返回falseif (operation.isCompleted)return false// 否则,将该operation加入到Key所在的WatcherListwatchForOperation(key, operation)// 设置watchCreated标记,表明该任务已经被加入到WatcherListif (!watchCreated) {watchCreated = true// 更新Purgatory中总请求数estimatedTotalOperations.incrementAndGet()}}// 再次尝试完成该延迟请求isCompletedByMe = operation.maybeTryComplete()if (isCompletedByMe)return true// 如果依然不能完成此请求,将其加入到时间轮if (!operation.isCompleted) {if (timerEnabled)timeoutTimer.add(operation)//再次判断,如果该请求已完成,则取消时间轮中的定时任务if (operation.isCompleted) {//取消定时任务operation.cancel()}}false}
该方法首先调用 tryComplete 方法,尝试完成延迟请求,如果返回结果是 true,就说明执行 tryCompleteElseWatch 方法的线程正常地完成了该延迟请求,也就不需要再添加到 WatcherList 了,直接返回 true 就行了。
否则的话,代码会遍历所有要监控的 Key,再次查看请求的完成状态。如果已经完成,就说明是被其他线程完成的,返回 false;如果依然无法完成,则将该请求加入到 Key 所在的 WatcherList 中,等待后续完成。同时,设置 watchCreated 标记,表明该任务已经被加入到 WatcherList ,然后更新 Purgatory 中总请求数。
待遍历完所有 Key 之后,源码会再次尝试完成该延迟请求,如果完成了,就返回 true;如果还是不能完成,且启用了时间轮,则将该任务添加到时间轮中
然后后再次判断这个任务是否被其他线程完成,如果完成了则取消该任务,最后返回 false。
先尝试完成延时请求
如果无法完成,就加入到 WatcherList,等待后面再试
def checkAndComplete(key: Any): Int = {// 获取给定Key对应的WatcherListval wl = watcherList(key)// 获取WatcherList中Key对应的Watchers对象实例,里面存储了需要完成的多个延时请求任务val watchers = inLock(wl.watchersLock) { wl.watchersByKey.get(key) }// 尝试完成满足完成条件的延时请求并返回成功完成的请求数if(watchers == null)0else//重要步骤:调用 Watchers 的 tryCompleteWatched 方法,尝试完成那些已满足完成条件的延迟请求watchers.tryCompleteWatched()}
//是否需要等待其它副本完成写入,通过delayedProduceRequestRequired方法来判断写入是否成功,如果为true,表示需要等待其它副本同步完成消息写入if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {//创建延时生产任务对象val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)//创建DelayedProduce延时请求对象val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq//再一次尝试完成该延时请求//如果暂时无法完成,则将对象放入到相应的Purgatory中等待后续处理delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)} else {//可以立即返回val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)//调用该回调逻辑然后返回responseCallback(produceResponseStatus)}
a. 当前 Broker 不再是分区的 Leader 副本所在节点,会在响应中封装一个错误信息 b. 当前 Broker 是分区的 Leader 副本所在节点: b-1:检查是否所有 Follower 副本完成写入操作时抛出一个本地错误,那么会在响应中封装一个错误信息 b-2:如果所有 Follower 副本完成写入操作,返回正常响应
override def tryComplete(): Boolean = {//遍历请求中的分区,检查写到这些分区的数据是否已经同步到了所有Follower副本上produceMetadata.produceStatus.foreach { case (topicPartition, status) =>trace(s"Checking produce satisfaction for $topicPartition, current status $status")if (status.acksPending) {val (hasEnough, error) = replicaManager.getPartition(topicPartition) match {case Some(partition) =>if (partition eq ReplicaManager.OfflinePartition)(false, Errors.KAFKA_STORAGE_ERROR)elsepartition.checkEnoughReplicasReachOffset(status.requiredOffset)case None =>(false, Errors.UNKNOWN_TOPIC_OR_PARTITION)}if (error != Errors.NONE || hasEnough) {status.acksPending = falsestatus.responseStatus.error = error}}}if (!produceMetadata.produceStatus.values.exists(_.acksPending))//强制完成延时请求,内部会首先会取消时间轮上的任务,然后调用 onComplete 方法forceComplete()elsefalse}
def forceComplete(): Boolean = {if (completed.compareAndSet(false, true)) {//取消时间轮上的定时任务cancel()//完成延时操作onComplete()true} else {false}}
override def onComplete() {val responseStatus = produceMetadata.produceStatus.mapValues(status => status.responseStatus)//执行回调逻辑responseCallback(responseStatus)}
首先会尝试完成该请求,如果完成了则直接返回。 如果无法完成,会将该请求放到延时任务管理器中,等待外部事件触发,同时,会将该延时请求封装成一个任务放到时间轮中进行管理。 在该请求超时之前,一旦有外部事件触发,就会尝试执行,如果完成了,就会取消时间轮上对应的定时任务,然后调用回调函数返回。 如果直到请求超时也无法完成,则会封装对应的异常信息并返回,此时该任务也会从时间轮上移除。
注意:在整个流程中,延时请求会在两个地方进行维护,一个是 DelayedOperationPurgatory 延时操作管理器,另一个是时间轮。延时操作管理器会根据外部事件的触发来不断尝试完成延时请求。而时间轮的作用更像是一个保险,如果超时之前该请求完成了,那么会取消时间轮上的定时任务;如果直到超时也无法完成,这时也会将任务从时间轮移除,并返回对应的超时异常。
极客时间《Kafka核心源码解读》
《深入理解Kafka核心设计与实践原理》




