暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

深入理解Kafka服务端之延时请求的处理

大数据记事本 2021-01-28
1543
一、场景分析
    在上一篇中,分析了分层时间轮的原理和相关的类定义及关键方法。可以发现,时间轮部分的代码和 Kafka 本身是解耦的,里面并未涉及到 Kafka 的任何组件。如果我们项目中有管理定时任务的需求,完全可以借鉴这部分代码。
    那么 Kafka 中是在哪里维护时间轮的呢?这里涉及到的就是 Timer 接口以及 SystemTimer 实现类,Timer 接口定义了管理延迟操作的方法,而 SystemTimer 类实现了延迟操作的关键代码,供上层调用。
    这里所说的上层,指的就是 DelayOperationPurgatory 类,我们可以认为它是一个延时操作管理器,在这个管理器内部,封装了一个 SystemTimer 定时器对象,用来操作时间轮。除此之外,为了驱动定时器向前推进,在管理器内还维护了一个 ExpiredOperationReaper 对象,根据字面意思,可以理解为 "过期操作收割机" ,本质上是一个线程对象。
    从这里可以看出,定时器、收割机线程延时操作管理器是一一对应的。


特别注意:定时操作延时操作是两个不同的概念。定时操作是指在特定时间之后执行的操作,时间轮用来管理定时操作。而延时操作可以在超时时间之前的任意时间点执行。因此,延时操作需要能够支持外部事件的触发。  

    比如延时生产操作,它的外部事件是所要写入消息的某个分区的 HW 发生增长。也就是说,随着 follower 副本不断地与 leader 副本进行消息同步,进而促使HW进一步增长,HW 每增长一次都会检测是否能够完成此次延时生产操作。

二、图示说明

Kafka 对延时生产请求的处理

三、源码分析

1. Timer 和 SystemTimer

Timer 接口:

    trait Timer {


    将给定的定时任务插入到时间轮上,等待后续延迟执行
    def add(timerTask: TimerTask): Unit


    向前推进时钟,执行已达过期时间的延迟任务
    def advanceClock(timeoutMs: Long): Boolean


    获取时间轮上总的定时任务数
    def size: Int


    关闭定时器
    def shutdown(): Unit
    }

    Timer 接口定义了四个方法:

    • add:将给定的定时任务插入到时间轮

    • advanceClock:向前推进时间轮,执行已到达过期时间的延时任务

    • size:获取时间轮上总的定时任务数

    • shutdown:关闭定时器服务

    SystemTimer 类:

        该类是 Timer 接口的实现,是一个定时器,内部封装了时间轮对象,为 Purgatory 延时操作管理器提供延时请求的管理功能。保存了因为不满足条件而无法完成,但是又没有超时的请求。

      @threadsafe
      class SystemTimer(executorName: String,//Purgatory 的名字。Kafka 中存在不同的 Purgatory
      tickMs: Long = 1,//时间轮中时间格的跨度
      wheelSize: Int = 20,//时间轮上时间格的数量
      startMs: Long = Time.SYSTEM.hiResClockMs//该 SystemTimer 定时器启动时间,单位是毫秒。
      ) extends Timer {


      // 单线程的线程池用于异步执行定时任务
      private[this] val taskExecutor = Executors.newFixedThreadPool(1, new ThreadFactory() {
      def newThread(runnable: Runnable): Thread =
      KafkaThread.nonDaemon("executor-"+executorName, runnable)
      })
      // 延迟队列保存所有Bucket,即所有TimerTaskList对象
      private[this] val delayQueue = new DelayQueue[TimerTaskList]()
      // 总定时任务数
      private[this] val taskCounter = new AtomicInteger(0)


      //时间轮对象
      private[this] val timingWheel = new TimingWheel(
      tickMs = tickMs,
      wheelSize = wheelSize,
      startMs = startMs,
      taskCounter = taskCounter,
      delayQueue
      )


      // 维护线程安全的读写锁
      private[this] val readWriteLock = new ReentrantReadWriteLock()
      private[this] val readLock = readWriteLock.readLock()
      private[this] val writeLock = readWriteLock.writeLock()
      ...
      }

      构造方法中的 tickMs wheelSize 参数之前已经分析过,分别代表了时间轮上每个时间格的跨度和时间格的数量,它们的乘积就是本层时间轮的跨度。executorName 表示 Purgatory 的名称,Kafka 中包含了多种 Purgatory。startMs 是该定时器启动时的时间戳,单位 ms。

      除此之外,SystemTimer 中还定义了几个关键的变量,含义分别如下:

      • delayQueue:保存了当前定时器下管理的所有时间格 bucket(TimerTaskList 类型),是一个 DelayQueue 对象,所以只有在 bucket 过期后,才能从该队列中获取到。

      • timingWheel:时间轮对象。SystemTimer 通过该变量来操作时间轮

      • taskExecutor:单线程的线程池,用于在定时任务到期后异步执行任务逻辑

      关键方法:

      add:将给定的定时任务插入时间轮,内部调用了 addTimerTaskEntry 方法

        def add(timerTask: TimerTask): Unit = {
        // 获取读锁。在没有线程持有写锁的前提下,
        // 多个线程能够同时向时间轮添加定时任务
        readLock.lock()
        try {
        // 调用addTimerTaskEntry执行插入逻辑
        addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + Time.SYSTEM.hiResClockMs))
        } finally {
        //释放读锁
        readLock.unlock()
        }
        }

        addTimerTaskEntry

          private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {


          if (!timingWheel.add(timerTaskEntry)) {


          // 定时任务未取消,说明定时任务已过期,就需要添加到执行任务的线程池等待执行
          // 否则timingWheel.add方法应该返回True
          if (!timerTaskEntry.cancelled)
          taskExecutor.submit(timerTaskEntry.timerTask)
          }
          }

          该方法内调用了时间轮 TimingWheel.add 方法:

          • 如果返回 true,说明插入定时任务成功,if 条件不满足,方法结束;

          • 如果返回 false,会继续判断任务被取消了还是任务过期了

            • 如果是取消了,则什么都不做;

            • 如果是任务过期了,则将该任务提交到线程池等待执行任务逻辑

          advanceClock:向前驱动时钟,说白了就是将时间轮的 currentTime 指针向前嘀嗒指定的时间格

            def advanceClock(timeoutMs: Long): Boolean = {


            //获取delayQueue中下一个已过期的Bucket,只有已经过期的任务才会执行,否则还是保存在时间轮中
            var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)


            if (bucket != null) {
            //获取写锁
            // 一旦有线程持有写锁,其他任何线程执行add或advanceClock方法时会阻塞
            writeLock.lock()
            try {
            while (bucket != null) {
            // 推动时间轮向前"滚动"到Bucket的过期时间点
            timingWheel.advanceClock(bucket.getExpiration())
            // 将该Bucket下的所有定时任务重写回到时间轮,这是因为部分任务会进行降级
            bucket.flush(reinsert)
            // 读取下一个Bucket对象
            bucket = delayQueue.poll()
            }
            } finally {
            // 释放写锁
            writeLock.unlock()
            }
            true
            } else {
            false
            }
            }

            该方法的逻辑是:遍历 delayQueue 中的所有 Bucket,并将时间轮的时钟依次推进到它们的过期时间点,令它们过期。然后,再将这些 Bucket 下的所有定时任务全部重新插入回时间轮。插入回时间轮的时候,就会涉及到时间轮的降级。(关于时间轮的降级,可以看上一篇)

            SystemTimer 封装了操作时间轮的各种方法,供上层进行调用,这里的上层调用方就是 DelayedOperationPurgatory。该类是一个泛型类,其参数类型为 DelayedOperation 类。

            2. DelayedOperation

            类定义:
              abstract class DelayedOperation(override val delayMs: Long,//超时时间,一般是客户端请求的超时时间,即request.timeout.ms
              lockOpt: Option[Lock] = None) extends TimerTask with Logging {
              // 标识该延迟操作是否已经完成
              private val completed = new AtomicBoolean(false)
              // 防止多个线程同时检查操作是否可完成时发生锁竞争导致操作最终超时
              private val tryCompletePending = new AtomicBoolean(false)


              private[server] val lock: Lock = lockOpt.getOrElse(new ReentrantLock)
              ...
              }

              可以看到,DelayedOperation 类是一个抽象类,构造方法中的 delayMs 表示超时时间,一般是客户端 request.timeout.ms 参数的值。该类定义了两个关键的变量:

              • completed:标识该延迟操作是否已经完成,默认为 false

              • tryCompletePending:标识拿到锁的线程是否有机会再次检查条件是否已经满足,防止多个线程同时检查操作是否可完成时发生锁竞争导致操作最终超时。


              这个参数是在 1.1 版本引入的。在此之前,只有 completed 参数。但是,这样就可能存在这样一个问题:当多个线程同时检查某个延迟操作是否满足完成条件时,如果其中一个线程持有了锁(也就是上面的 lock 字段),然后执行条件检查,会发现不满足完成条件。而与此同时,另一个线程执行检查时却发现条件满足了,但是这个线程又没有拿到锁,此时,该延迟操作将永远不会有再次被检查的机会,会导致最终超时。

              关键方法:
              • forceComplete:强制完成延迟操作,不管它是否满足完成条件。每当操作满足完成条件或已经过期了,就需要调用该方法完成该操作。
              • isCompleted:检查延迟操作是否已经完成。源码使用这个方法来决定后续如何处理该操作。比如如果操作已经完成了,那么通常需要取消该操作。
              • onExpiration:强制完成之后执行的过期逻辑回调方法。只有真正完成操作的那个线程才有资格调用这个方法。
              • onComplete:完成延迟操作所需的处理逻辑。这个方法只会在 forceComplete 方法中被调用。
              • tryComplete:尝试完成延迟操作的顶层方法,内部会调用 forceComplete 方法。
              • maybeTryComplete:线程安全版本的 tryComplete 方法。现在外部代码调用的都是这个方法了。
              • run:调用延迟操作超时后的过期逻辑,也就是组合调用 forceComplete + onExpiration。
                  DelayedOperation 是一个抽象类,不同类型的延时请求,其 onExpiration、onComplete 和 tryComplete 方法的实现是不同的,比如 DelayedProduce 延时生产、DelayedFetch 延时拉取等实现了各自的逻辑。
                  这里重点看一下 maybeTryComplete 方法,该方法是为了规避因多线程访问产生锁争用导致线程阻塞,从而引发请求超时问题而添加的
                private[server] def maybeTryComplete(): Boolean = {
                //标记是否需要重试
                var retry = false
                //标记延时操作是否已经完成
                var done = false
                do {
                //尝试获取锁对象
                if (lock.tryLock()) {
                try {
                tryCompletePending.set(false)
                //尝试完成延迟操作
                done = tryComplete()
                } finally {
                lock.unlock()
                }
                // 运行到这里的线程持有锁,其他线程只能运行else分支的代码
                // 如果其他线程将maybeTryComplete设置为true,那么retry=true
                // 这就相当于其他线程给了本线程重试的机会
                retry = tryCompletePending.get()
                } else {


                // 运行到这里的线程没有拿到锁
                // 设置tryCompletePending=true给持有锁的线程一个重试的机会
                retry = !tryCompletePending.getAndSet(true)
                }
                } while (!isCompleted && retry)
                done
                }
                这个方法可能会被多个线程同时访问,根据是否拿到了锁,执行不同的逻辑:
                • 如果拿到了锁,则清空 tryCompletePending 状态,尝试完成延时请求,最后释放锁并读取最新的 retry 状态

                • 如果没有拿到锁,则设置 tryCompletePending 状态,来间接影响 retry 值,从而给获取到锁的线程一个重试的机会。重试通过 do...while 循环实现

                3. DelayedOperationPurgatory

                    前面提到,DelayedOperationPurgatory 是一个泛型类,它的参数类型是 DelayedOperation 的具体子类。因此,通常情况下每一类延时请求都对应一个 DelayedOperationPurgatory 实例,如上一篇提到的在初始化副本管理器时创建的四种管理器分别对应了四类延时请求。
                类定义:
                  final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,//Purgatory名称
                  timeoutTimer: Timer,//定时器,里面封装了分层时间轮
                  brokerId: Int = 0,//broker编号
                  purgeInterval: Int = 1000,//控制删除线程移除 Bucket 中的过期延迟请求的频率,默认为1秒
                  reaperEnabled: Boolean = true,//是否启动删除线程,默认为true
                  timerEnabled: Boolean = true)//是否启用分层时间轮,默认为true
                  extends Logging with KafkaMetricsGroup {
                  private val watcherLists = Array.fill[WatcherList](DelayedOperationPurgatory.Shards)(new WatcherList)


                  //管理器中预估的总的延时请求数
                  private[this] val estimatedTotalOperations = new AtomicInteger(0)


                  //过期操作收割机线程
                  private val expirationReaper = new ExpiredOperationReaper()
                  ...
                  }
                  构造方法的 6 个参数的含义如下:
                  • purgatoryName:延时操作管理器的名称

                  • timeoutTimer:定时器对象

                  • brokerId:Broker 的编号

                  • purgeInterval:用于控制收割机线程移除 Bucket 中的过期延迟请求的频率,在绝大部分情况下,都是 1 秒一次。可以通过生产者参数进行配置:producer.purgatory.purge.interval.requests

                  • reaperEnabled:是否开启收割机线程,默认开启

                  • timerEnabled:是否采用分层时间轮,默认采用

                  除此之外,还定义了三个关键的变量:
                  • watcherLists

                  • estimatedTotalOperations:管理器中总的延时请求数

                  • expirationReaper :过期操作收割机线程对象,会定期调用 advanceClock 方法驱动时间轮向前推进

                  在 DelayedOperationPurgatory 类里面,定义了三个内部类:

                  WatcherList:

                     private class WatcherList {
                    // 定义一组按照Key分组的Watchers对象,Key 可以是任何类型,而 Value 就是 Key 对应类型的一组 Watchers 对象。
                    val watchersByKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key)))


                    val watchersLock = new ReentrantLock()
                    // 返回所有Watchers对象
                    def allWatchers = {
                    watchersByKey.values
                    }
                    }
                        该类中最重要的变量就是 watchersByKey,是一个对象池。watchersByKey 的 Key 可以是任何类型,而 Value 就是 Key 对应类型的一组 Watchers 对象。
                    Watchers:
                        是基于 Key 的一个延迟请求的监控链表。这里的 key 可以是任意类型,Kafka 利用这个类来监控保存其中的延迟请求的可完成状态。
                      private class Watchers(val key: Any) {
                      //延迟请求的链表
                      private[this] val operations = new ConcurrentLinkedQueue[T]()
                      ...
                      }

                          Watchers 的本质是一个链表,它提供的方法主要就是操作链表的方法,比如tryCompleteWatched 方法会遍历整个链表,并尝试完成其中的延迟请求。

                        def tryCompleteWatched(): Int = {
                        //完成的请求数
                        var completed = 0


                        val iter = operations.iterator()
                        //遍历保存延时请求的链表
                        while (iter.hasNext) {
                        val curr = iter.next()
                        if (curr.isCompleted) {


                        //如果请求完成了就从链表中移除
                        iter.remove()
                        //如果尝试完成了请求,则将请求从链表中移除,并更新完成的请求数
                        } else if (curr.maybeTryComplete()) {
                        iter.remove()
                        completed += 1
                        }
                        }


                        if (operations.isEmpty)
                        removeKeyIfEmpty(key, this)
                        //返回完成的延时请求数
                        completed
                        }

                        ExpiredOperationReaper:

                            是一个线程类,逻辑就是每隔 200 毫秒调用一次 advanceClock 方法驱动时间轮向前推进。
                          private class ExpiredOperationReaper extends ShutdownableThread(
                          "ExpirationReaper-%d-%s".format(brokerId, purgatoryName),
                          false) {


                          override def doWork() {
                          advanceClock(200L)
                          }
                          }

                          这里为什么是每隔 200 毫秒向前驱动一次时间轮,而不是每毫秒驱动一次呢?
                              首先 advanceClock 内部调用的是 SystemTimer.advanceClock 方法,会根据给定的超时时间从 SystemTimer 的 DelayQueue 队列中取出超时的任务列表bucket(类型为TimerTaskList)。队列中的 bucket 是按照过期时间进行排序的,最早过期的会排在队头。假设队头 bucket 的超时时间为 200ms,第二个 bucket 的超时时间为 800ms,如果每隔 1ms 向前推进一次,那么获取队头 bucket 时执行的 200 次推进中有 199 次是 "空推进",而获取第二个 bucket 时执行的 600 次推进中有 599 次是 "空推进",这样会无故消耗机器的性能。
                              从这里也可以看出,Kafka 用时间轮做擅长的定时任务插入和删除操作,而使用 DelayQueue 做擅长的时间推进工作,两者各司其职,做到精准推进。

                          DelayedOperationPurgatory 中的两个关键方法:

                          ① tryCompleteElseWatch:检查操作是否能够完成,如果不能的话,就把它加入到对应 Key 所在的 WatcherList 中

                            def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
                            assert(watchKeys.nonEmpty, "The watch key list can't be empty")


                            //尝试完成给定的操作
                            var isCompletedByMe = operation.tryComplete()
                            // 如果该延迟请求是由本线程完成的,直接返回true即可
                            if (isCompletedByMe)
                            return true


                            var watchCreated = false
                            // 遍历所有要监控的Key
                            for(key <- watchKeys) {
                            // 再次查看请求的完成状态,如果已经完成,就说明是被其他线程完成的,返回false
                            if (operation.isCompleted)
                            return false
                            // 否则,将该operation加入到Key所在的WatcherList
                            watchForOperation(key, operation)


                            // 设置watchCreated标记,表明该任务已经被加入到WatcherList
                            if (!watchCreated) {
                            watchCreated = true
                            // 更新Purgatory中总请求数
                            estimatedTotalOperations.incrementAndGet()
                            }
                            }
                            // 再次尝试完成该延迟请求
                            isCompletedByMe = operation.maybeTryComplete()
                            if (isCompletedByMe)
                            return true


                            // 如果依然不能完成此请求,将其加入到时间轮
                            if (!operation.isCompleted) {
                            if (timerEnabled)
                            timeoutTimer.add(operation)
                            //再次判断,如果该请求已完成,则取消时间轮中的定时任务
                            if (operation.isCompleted) {
                            //取消定时任务
                            operation.cancel()
                            }
                            }


                            false
                            }

                                该方法首先调用 tryComplete 方法,尝试完成延迟请求,如果返回结果是 true,就说明执行 tryCompleteElseWatch 方法的线程正常地完成了该延迟请求,也就不需要再添加到 WatcherList 了,直接返回 true 就行了。

                                否则的话,代码会遍历所有要监控的 Key,再次查看请求的完成状态。如果已经完成,就说明是被其他线程完成的,返回 false;如果依然无法完成,则将该请求加入到 Key 所在的 WatcherList 中,等待后续完成。同时,设置 watchCreated 标记,表明该任务已经被加入到 WatcherList ,然后更新 Purgatory 中总请求数。

                                待遍历完所有 Key 之后,源码会再次尝试完成该延迟请求,如果完成了,就返回 true;如果还是不能完成,且启用了时间轮,则将该任务添加到时间轮中

                                然后后再次判断这个任务是否被其他线程完成,如果完成了则取消该任务,最后返回 false。

                            综合来看,这个方法主要做了两件事:
                            • 先尝试完成延时请求

                            • 如果无法完成,就加入到 WatcherList,等待后面再试

                            ② checkAndComplete:检查给定 Key 所在的 WatcherList 中的延迟请求是否满足完成条件,如果是的话,则结束掉它们,也就是进行任务的重试
                              def checkAndComplete(key: Any): Int = {
                              // 获取给定Key对应的WatcherList
                              val wl = watcherList(key)


                              // 获取WatcherList中Key对应的Watchers对象实例,里面存储了需要完成的多个延时请求任务
                              val watchers = inLock(wl.watchersLock) { wl.watchersByKey.get(key) }


                              // 尝试完成满足完成条件的延时请求并返回成功完成的请求数
                              if(watchers == null)
                              0
                              else
                              //重要步骤:调用 Watchers 的 tryCompleteWatched 方法,尝试完成那些已满足完成条件的延迟请求
                              watchers.tryCompleteWatched()
                              }
                                  该方法首先会根据给定的 Key,获取 watcherLists 数组中 key 对应的 WatcherList 对象,然后从  WatcherList 对象池中根据 key 获取对应的 Watchers 对象实例,Watchers 中保存了给定 key 对应的所有延时请求。
                                  然后尝试完成满足条件的延时请求,并返回成功完成的请求数。Watchers 的 tryCompleteWatched 方法用于尝试完成那些已满足完成条件的延迟请求。
                              四、通过延时生产请求分析整个延时请求的处理流程
                                  当调用 ReplicaManager.appendRecords 方法写入数据时,会根据请求中的 ack 参数判断是否需要等待 Follower 副本完成写入,如果需要,会把这个生产请求放到对应的延时生产请求管理器中,代码如下:
                                //是否需要等待其它副本完成写入,通过delayedProduceRequestRequired方法来判断写入是否成功,如果为true,表示需要等待其它副本同步完成消息写入
                                if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
                                //创建延时生产任务对象
                                val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
                                //创建DelayedProduce延时请求对象
                                val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)


                                val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq


                                //再一次尝试完成该延时请求
                                //如果暂时无法完成,则将对象放入到相应的Purgatory中等待后续处理
                                delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)


                                } else {
                                //可以立即返回
                                val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
                                //调用该回调逻辑然后返回
                                responseCallback(produceResponseStatus)
                                }
                                    首先,创建了 DelayedProduce 实例对象,该类是上面提到的 DelayedOperation 抽象类的具体实现,主要实现了 tryComplete、onExpiration 和 onComplete 方法,分别如下:
                                tryComplete:尝试完成延时请求。可以完成的条件如下:
                                如果请求中的所有分区均满足以下任意条件,则可以完成该延时请求:
                                • a. 当前 Broker 不再是分区的 Leader 副本所在节点,会在响应中封装一个错误信息
                                • b. 当前 Broker 是分区的 Leader 副本所在节点:
                                  • b-1:检查是否所有 Follower 副本完成写入操作时抛出一个本地错误,那么会在响应中封装一个错误信息
                                  • b-2:如果所有 Follower 副本完成写入操作,返回正常响应
                                  override def tryComplete(): Boolean = {


                                  //遍历请求中的分区,检查写到这些分区的数据是否已经同步到了所有Follower副本上
                                  produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
                                  trace(s"Checking produce satisfaction for $topicPartition, current status $status")


                                  if (status.acksPending) {
                                  val (hasEnough, error) = replicaManager.getPartition(topicPartition) match {
                                  case Some(partition) =>
                                  if (partition eq ReplicaManager.OfflinePartition)
                                  (false, Errors.KAFKA_STORAGE_ERROR)
                                  else
                                  partition.checkEnoughReplicasReachOffset(status.requiredOffset)
                                  case None =>

                                  (false, Errors.UNKNOWN_TOPIC_OR_PARTITION)
                                  }


                                  if (error != Errors.NONE || hasEnough) {
                                  status.acksPending = false
                                  status.responseStatus.error = error
                                  }
                                  }
                                  }


                                  if (!produceMetadata.produceStatus.values.exists(_.acksPending))
                                  //强制完成延时请求,内部会首先会取消时间轮上的任务,然后调用 onComplete 方法
                                  forceComplete()
                                  else
                                  false
                                  }
                                      如果可以完成该延时请求,那么会调用 forceComplete 方法(在DelayedOperation类中定义)来强制完成,内部首先会取消时间轮上保存的该延时任务,然后调用 onComplete 方法执行回调逻辑:
                                  forceComplete:
                                    def forceComplete(): Boolean = {
                                    if (completed.compareAndSet(false, true)) {
                                    //取消时间轮上的定时任务
                                    cancel()
                                    //完成延时操作
                                    onComplete()
                                    true
                                    } else {
                                    false
                                    }
                                    }
                                    onComplete:
                                      override def onComplete() {
                                      val responseStatus = produceMetadata.produceStatus.mapValues(status => status.responseStatus)
                                      //执行回调逻辑
                                      responseCallback(responseStatus)
                                      }
                                      综上:对于延时请求,Kafka 的处理流程如下:
                                      • 首先会尝试完成该请求,如果完成了则直接返回。
                                      • 如果无法完成,会将该请求放到延时任务管理器中,等待外部事件触发,同时,会将该延时请求封装成一个任务放到时间轮中进行管理。
                                      • 在该请求超时之前,一旦有外部事件触发,就会尝试执行,如果完成了,就会取消时间轮上对应的定时任务,然后调用回调函数返回。
                                      • 如果直到请求超时也无法完成,则会封装对应的异常信息并返回,此时该任务也会从时间轮上移除。

                                      注意:在整个流程中,延时请求会在两个地方进行维护,一个是 DelayedOperationPurgatory 延时操作管理器,另一个是时间轮。延时操作管理器会根据外部事件的触发来不断尝试完成延时请求。而时间轮的作用更像是一个保险,如果超时之前该请求完成了,那么会取消时间轮上的定时任务;如果直到超时也无法完成,这时也会将任务从时间轮移除,并返回对应的超时异常。

                                      参考:

                                      极客时间《Kafka核心源码解读》

                                      《深入理解Kafka核心设计与实践原理》

                                      文章转载自大数据记事本,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                      评论