暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

深入理解Kafka服务端之高效的时间轮算法在定时任务中的应用

大数据记事本 2021-01-26
3027
一、场景分析
    在 Kafka 中,存在着大量的延时操作,比如在 ReplicaManager 的构造方法中,就定义了四个延时相关的 Purgatory 对象,分别用来处理延时生产、延时拉取、延时删除以及延时 Leader 选举操作,代码如下:
    // 处理延时PRODUCE生产请求的Purgatory
    valdelayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce],
    // 处理延时FETCH拉取请求的Purgatory
    valdelayedFetchPurgatory: DelayedOperationPurgatory[DelayedFetch],
    // 处理延时DELETE_RECORDS删除请求的Purgatory
    valdelayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords],
    // 处理延时ELECT_LEADERS leader选举请求的Purgatory
    valdelayedElectPreferredLeaderPurgatory: DelayedOperationPurgatory[DelayedElectPreferredLeader],

    深入理解Kafka服务端之副本管理器介绍及Leader副本和Follower副本的切换

        Kafka 并没有使用 JDK 自带的 Timer 或 DelayQueue 来实现延时功能,因为其插入和删除操作的平均时间复杂度均为O(nlogn),并不能满足 Kafka 高性能的要求。基于时间轮的概念,Kafka 自定义实现了一个用于延时功能的定时器:SystemTimer。该定时器可以将插入和删除操作的时间复杂度都降为O(1)。
    二、图示说明
    单层时间轮的整体结构

        如上图:Kafka 中的时间轮(TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,其中每个元素是一个任务列表(TimerTaskList),TimerTaskList 是一个双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),每个定时任务项中封装了真正的定时任务(TimerTask)。
        在每一层时间轮中,都包含几个重要的概念:
    • tickMs:时间轮的基本时间跨度,每个时间格代表一个时间跨度。
    • wheelSize:每层时间轮的时间格的个数。对于每层时间轮来说,这个值是固定的
    • interval:总体时间跨度。可以通过公式:tickMs * wheelSize 计算得出
    • currentTime:表盘指针,表示当前时间轮所处的时间。这里对时间做了调整,表示小于当前时间的最大时间跨度的整数倍。假设当前时间戳为 123 毫秒,时间轮每格跨度为 20 ms,那么 currentTimer 就是小于 123 且 20 的整数倍,即 120 ms。currentTime 将整个时间轮划分为到期部分未到期部分,currentTime 当前执行的时间格也属于到期部分,表示刚好到期。需要处理当前时间格对应的 TimerTaskList 中的所有任务。
        下层时间轮的 interval 就是上层时间轮的 tickMs。例如,Kafka 中时间轮每层的 wheelSize 固定为 20:
    • 第一层的 tickMs 为 1 ms,所以第一层的 interval = 1 * 20 =20 ms;
    • 第二层的 tickMs = 第一层的 interval,即 20 ms,所以第二层的 interval = 20 * 20 = 400 ms,以此类推。

    时间轮的工作机制:

        注意:这里为了画图方便,每层时间轮中 wheelSize 采用 8 做示范,实际 Kafka 中 wheelSize 为 20。
        初始情况下,表盘指针指向时间格 0,此时有一个定时为 2 ms的任务添加到时间轮,会被放到时间格为 2 的 TimerTaskList 中。

        随着时间的推移,指针 currentTime 不断向前推进,过了 2ms 之后,当 currentTime 指向时间格 2 时,就需要将对应的 TimerTaskList 中的任务进行到期操作。

        假如此时有一个定时为 3ms 的任务插入,会被放到时间格 5 中;如果此时有一个定时为 7ms 的任务插入,那么会复用原来的 TimerTaskList,会被放到时间格 1 中。

        假如此时有一个定时为 30ms 的任务插入,当前时间轮无法容纳,应该怎么办?如果直接扩容 wheelSize 的大小,那么当延时更长的任务插入时,需要不断扩容,这不仅占用了很大的内存空间,而且也会拉低效率。Kafka 这里采用了分层时间轮的概念,当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中。
        还是上面的例子,当插入定时为 30ms 的任务时,这个任务会被插入第二层时间轮的时间格 3 中,如下图:

    时间轮的降级:

        假设此时有一个定时为 110ms 的的定时任务,被插入到了第三层时间轮的时间格 1 中(第三层时间轮中,时间格 0 包含 [0,64) 区间内的定时任务,时间格 1 中包含 [64,128) 区间内的定时任务)。
        随着时间的推移,当第三层时间轮时间格 1 对应的 TimerTaskList 到期时,原本设定为 110ms 的定时任务还剩下 110-64=46ms,此时还不能执行这个任务的到期操作,这里就会进行一个时间轮的降级,会将这个剩余时间为 46ms 的定时任务重新提交到层级时间轮。此时第一层时间跨度仍然不够,但是第二层时间跨度已经足够插入这个定时任务,所以会将该任务添加到第二层时间轮的时间格 5 中([ 40,48))。
        再过 40ms 之后,当第二层时间格 5 对应的 TimerTaskList 到期后,由于还剩 6ms,该任务还无法执行到期操作,会再次被降级放到第一层时间轮的时间格 6 中。
        再经过 6ms,此任务真正到期,最终执行相应的到期操作。

    三、源码分析

    时间轮相关的类或接口主要有以下几个:
    • TimerTask:定时任务

    • TimerTaskEntry:封装了定时任务的定时任务项

    • TimerTaskList:定时任务列表,是一个双向链表,链表的每个元素为 TimerTaskEntry

    • TimingWheel:时间轮

    1.TimerTask

      trait TimerTask extends Runnable {
      // 延时时间。通常是request.timeout.ms参数值
      val delayMs: Long
      // 每个TimerTask实例关联一个TimerTaskEntry
      // 就是说每个定时任务需要知道它在哪个Bucket链表下的哪个链表元素上
      private[this] var timerTaskEntry: TimerTaskEntry = null
      // 取消定时任务,原理就是将关联的timerTaskEntry置空
      def cancel(): Unit = {
      synchronized {
      if (timerTaskEntry != null) timerTaskEntry.remove()
      timerTaskEntry = null
      }
      }
      // 关联timerTaskEntry,原理是给timerTaskEntry字段赋值
      private[timer] def setTimerTaskEntry(entry: TimerTaskEntry): Unit = {
          synchronized {
      if (timerTaskEntry != null && timerTaskEntry != entry)
      timerTaskEntry.remove()


      timerTaskEntry = entry
      }
      }
      // 获取关联的timerTaskEntry实例
      private[timer] def getTimerTaskEntry(): TimerTaskEntry = {
      timerTaskEntry
      }


      }
          TimerTask 是一个 Scala 接口(Trait),继承了 Runnable。每个 TimerTask 都有一个 delayMs 字段,表示这个定时任务的超时时间。通常来说,这就是客户端参数 request.timeout.ms 的值。TimerTask绑定了一个 timerTaskEntry 字段,因为,每个定时任务都要知道,它存放在哪个定时任务项 TimerTaskEntry 对象中。
          同时,为 TimerTaskEntry 对象提供了 get 和 set 方法。set 方法在给 timerTaskEntry 赋值之前,必须要先考虑这个定时任务是否已经绑定了其他的 timerTaskEntry,如果是的话,就必须先取消绑定。另外, 整个 set 方法体必须由 monitor 锁保护起来,以保证线程安全性。
      2. TimerTaskEntry
        private[timer] class TimerTaskEntry(val timerTask: TimerTask, val expirationMs: Long) extends Ordered[TimerTaskEntry] {


        @volatile
        // 绑定的Bucket链表实例
        //Kafka 的延时请求可能会被其他线程从一个链表搬移到另一个链表中,因此,为了保证必要的内存可见性,代码声明 list 为 volatile
        var list: TimerTaskList = null
        var next: TimerTaskEntry = null
        var prev: TimerTaskEntry = null
        // 关联给定的定时任务
        if (timerTask != null) timerTask.setTimerTaskEntry(this)


        // 关联定时任务是否已经被取消了
        def cancelled: Boolean = {
        timerTask.getTimerTaskEntry != this
        }


        // 从Bucket链表中移除自己
        def remove(): Unit = {
            var currentList = list


        //需要注意的是,置空这个动作是在 TimerTaskList 的 remove 中完成的,而这个方法可能会被其他线程同时调用,
        // 因此,下面的代码使用了 while 循环的方式来确保 TimerTaskEntry 的 list 字段确实被置空了。这样,Kafka 才能安全地认为此链表元素被成功移除。
        while (currentList != null) {
        //将自身节点从所属的双向列表上移除
        currentList.remove(this)
        currentList = list
        }
        }


        override def compare(that: TimerTaskEntry): Int = {
        this.expirationMs compare that.expirationMs
        }
        }
            该类用于定义双向链表中的基本元素,其封装了基本的任务对象 TimerTask 及 过期时间。同时,维护了一个 TimerTaskList 对象,表征自身所在的 TimerTaskList 双向链表。
            remove 方法的逻辑是将 TimerTask 自身从双向链表中移除掉,因此,代码调用了 TimerTaskList 的 remove 方法来做这件事。一旦 TimerTaskList 对象置空,表示当前 TimerTaskEntry 不再属于任何一个双向链表,也就相当于移除了。
            需要注意的是,置空这个动作是在 TimerTaskList 的 remove 中完成的,而这个方法可能会被其他线程同时调用,因此,remove方法中使用了 while 循环的方式来确保 TimerTaskEntry 的 list 字段确实被置空了。这样,Kafka 才能安全地认为此链表元素被成功移除。

        3. TimerTaskList

        类定义:

          private[timer] class TimerTaskList(taskCounter: AtomicInteger //用于标识当前这个链表中的总定时任务数
          ) extends Delayed {


          private[this] val root = new TimerTaskEntry(null, -1)
          root.next = root
          root.prev = root
          //表示这个链表所在 Bucket 的过期时间戳
          private[this] val expiration = new AtomicLong(-1L)
            ...
            }
          其中,关键属性的含义如下:
          • taskCounter:用于标识当前这个链表中的总任务数

          • expiration:表示这个链表所在的时间格(bucket)的过期时间戳

              同一层的 Bucket 的时间间隔都是一样的。只有当前时间越过了 Bucket 的起始时间,这个 Bucket 才算是过期。而这里的起始时间,就是代码中 expiration 字段的值。

          主要方法:

          setExpiration:设置过期时间戳。这里使用了 AtomicLong 的 CAS 方法 getAndSet 原子性地设置了过期时间戳,之后将新过期时间戳和旧值进行比较,看看是否不同,然后返回结果,如果不同则返回 true。

            def setExpiration(expirationMs: Long): Boolean = {
            //通过getAndSet的CAS操作原子性地设置了过期时间戳
            // 之后将新过期时间戳和旧值进行比较,看看是否不同,然后返回结果,不同返回true


            //这里为什么要比较新旧值是否不同呢?这是因为,目前 Kafka 使用一个 DelayQueue 统一管理所有的 Bucket,也就是 TimerTaskList 对象。
            // 随着时钟不断向前推进,原有 Bucket 会不断地过期,然后失效。当这些 Bucket 失效后,源码会重用这些 Bucket。
            // 重用的方式就是重新设置 Bucket 的过期时间,并把它们加回到 DelayQueue 中。
            // 这里进行比较的目的,就是用来判断这个 Bucket 是否要被插入到 DelayQueue。如果为true则插入到DelayQueue中
            expiration.getAndSet(expirationMs) != expirationMs
            }
            flush:清空链表中的 TimerTaskEntry 元素,并对所有元素执行给定的 f 函数。该方法用于将高层时间轮上的定时任务重新插入到底层时间轮,也就是上面提到的降级。
              def flush(f: (TimerTaskEntry) => Unit): Unit = {
              synchronized {
              // 找到链表第一个元素
              var head = root.next
              // 开始遍历链表
              while (head ne root)
                    // 移除遍历到的链表元素
              remove(head)
              // 执行传入函数f的逻辑
              f(head)
              head = root.next
              }
                  // 清空过期时间设置
              expiration.set(-1L)
              }
              此外,还提供了 add 和 remove 方法用于将 TimerTaskEntry 插入链表或从链表移除,基本就是链表的操作,这里不做详细说明。
              add:将定时任务插入双向链表
                def add(timerTaskEntry: TimerTaskEntry): Unit = {
                var done = false
                  while (!done) {
                // 在添加之前尝试移除该定时任务,保证该任务没有在其他链表中
                timerTaskEntry.remove()


                synchronized {
                timerTaskEntry.synchronized {
                        if (timerTaskEntry.list == null) {
                val tail = root.prev
                timerTaskEntry.next = root
                timerTaskEntry.prev = tail
                timerTaskEntry.list = this
                // 把timerTaskEntry添加到链表末尾
                tail.next = timerTaskEntry
                root.prev = timerTaskEntry
                taskCounter.incrementAndGet()
                done = true
                }
                }
                }
                }
                }
                remove:将指定的 TimerTaskEntry 从双向链表移除
                  def remove(timerTaskEntry: TimerTaskEntry): Unit = {
                  synchronized {
                  timerTaskEntry.synchronized {
                  //如果该TimerTaskEntry的list对象就是当前TimerTaskList对象,说明该TimerTaskEntry在当前链表上
                  if (timerTaskEntry.list eq this) {
                  //将下一个节点的前置指针指向当前节点的前置指针
                  timerTaskEntry.next.prev = timerTaskEntry.prev
                  //将上一个节点的后置指针指向当前节点的后置指针
                  timerTaskEntry.prev.next = timerTaskEntry.next
                  //将给定TimerTaskEntry的前置、后置指针和所属TimerTaskList对象置空
                  timerTaskEntry.next = null
                  timerTaskEntry.prev = null
                  timerTaskEntry.list = null
                  //更新链表上节点个数
                  taskCounter.decrementAndGet()
                  }
                  }
                  }
                  }

                  4. TimingWheel

                  类定义:

                    private[timer] class TimingWheel(tickMs: Long, //时间格跨度,类似于手表的例子中向前推进一格的时间,第一层时间轮的tickMs为1毫秒
                    wheelSize: Int,//每一层时间轮上的 Bucket 数量。第 1 层的 Bucket 数量是 20
                    startMs: Long,//时间轮对象被创建时的起始时间戳
                    taskCounter: AtomicInteger,//这一层时间轮上的总定时任务数
                    queue: DelayQueue[TimerTaskList]//将所有 Bucket 按照过期时间排序的延迟队列。
                    // 随着时间不断向前推进,Kafka 需要依靠这个队列获取那些已过期的 Bucket,并清除它们
                    ) {


                    //这层时间轮总时长,等于时间格跨度乘以 wheelSize。对于第一层时间轮来说,tickMs=1MS,wheelSize=20,所以interval=20MS
                    private[this] val interval = tickMs * wheelSize


                    //时间轮下的所有 Bucket 对象,也就是所有 TimerTaskList 对象
                    private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }


                    //当前时间戳,只是源码对它进行了一些微调整,将它设置成小于当前时间的最大时间格跨度的整数倍。
                    // 例如:假设时间格跨度是 20 毫秒,当前时间戳是 123 毫秒,那么,currentTime 会被调整为 120 毫秒。
                      private[thisvar currentTime = startMs - (startMs % tickMs) 
                    //上层时间轮,按需创建
                    @volatile private[this] var overflowWheel: TimingWheel = null
                    TimingWheel 中定义了 9 个属性,各自的含义如下:
                    • tickMs:每一个时间格的跨度。在 Kafka 中,第 1 层时间轮的 tickMs 被固定为 1 毫秒,也就是说,向前推进一格 Bucket 的时长是 1 毫秒。

                    • wheelSize:每一层时间轮上的时间格 Bucket 的数量。第 1 层的 Bucket 数量是 20。

                    • startMs:时间轮对象被创建时的起始时间戳。

                    • taskCounter:这一层时间轮上的总定时任务数。

                    • queue:将所有 Bucket 按照过期时间排序的延迟队列。随着时间不断向前推进,Kafka 需要依靠这个队列获取那些已过期的 Bucket,并清除它们。

                    • interval:这层时间轮总时长,等于时间格跨度乘以 wheelSize。以第 1 层为例,interval 就是 20 毫秒。由于下一层时间轮的时间格跨度就是上一层的总时长,因此,第 2 层的时间格跨度就是 20 毫秒,总时长是 400 毫秒,以此类推。

                    • buckets:时间轮下的所有 Bucket 对象,也就是所有 TimerTaskList 对象。

                    • currentTime:当前时间戳,只是源码对它进行了一些微调整,将它设置成小于当前时间的最大时间格跨度的整数倍。举个例子,假设tickMs时长是 20 毫秒,当前时间戳是 123 毫秒,那么,currentTime 会被调整为 120 毫秒。

                    • overflowWheel:上层时间轮对象,不一定存在,按需创建

                    TimingWheel 的关键方法:

                     addOverflowWheel:用于创建上层时间轮对象

                      private[this] def addOverflowWheel(): Unit = {
                      synchronized {
                      if (overflowWheel == null) {
                      //创建一个新的时间轮对象,其起始时间就是当前时间,时间格跨度就是当前时间轮的总时长,且每层时间轮的轮子数量wheelSize是一样的
                      //例如:第一层时间轮的总时长为20MS,而第二层时间轮一个bucket的时间跨度就是20MS,总时长为400MS
                      overflowWheel = new TimingWheel(
                      tickMs = interval,
                      wheelSize = wheelSize,
                      startMs = currentTime,
                      taskCounter = taskCounter,
                      queue
                      )
                      }
                      }
                      }
                       add:将定时任务项 TimerTaskEntry 添加到时间轮中
                        def add(timerTaskEntry: TimerTaskEntry): Boolean = {
                        // 获取定时任务的过期时间戳
                        val expiration = timerTaskEntry.expirationMs
                        // 如果该任务已然被取消了,则无需添加,直接返回false
                        if (timerTaskEntry.cancelled) {
                        false
                        // 如果该任务超时时间已过期,返回false
                        } else if (expiration < currentTime + tickMs) {
                        false
                        //如果该任务超时时间在本层时间轮覆盖时间范围内
                        } else if (expiration < currentTime + interval) {
                        //计算要被放入到哪个Bucket中
                        val virtualId = expiration tickMs
                        //获取对应的bucket
                        val bucket = buckets((virtualId % wheelSize.toLong).toInt)
                        //将元素添加到bucket对应的双向链表
                        bucket.add(timerTaskEntry)
                        // 设置Bucket过期时间
                        // 如果该时间变更过,说明Bucket是新建或被重用,将其加回到DelayQueue
                        if (bucket.setExpiration(virtualId * tickMs)) {
                        //将bucket添加到queue中
                        queue.offer(bucket)
                        }
                        true
                        // 如果本层时间轮无法容纳该任务,交由上层时间轮处理
                        } else {


                        //如果上层时间轮对象为空,创建新的上层时间轮
                        if (overflowWheel == null) addOverflowWheel()
                        overflowWheel.add(timerTaskEntry)
                        }
                        }

                        该方法的逻辑如下:

                        • 第一步:获取定时任务的过期时间戳。所谓过期时间戳,就是这个定时任务过期时的时点。
                        • 第二步:看定时任务是否已被取消。如果已经被取消,则无需加入到时间轮中。如果没有被取消,就接着看这个定时任务是否已经过期。如果过期了,也不用加入到时间轮中。如果没有过期,再看这个定时任务的过期时间是否能够被涵盖在本层时间轮的时间范围内。如果可以,则进入到下一步。
                        • 第三步:算目标 Bucket 序号,也就是这个定时任务需要被保存在哪个 TimerTaskList 中确定了目标 Bucket 序号之后,代码会将该定时任务添加到这个 Bucket 下,同时更新这个 Bucket 的过期时间戳
                        • 第四步:如果这个 Bucket 是首次插入定时任务,那么,还同时要将这个 Bucket 加入到 DelayQueue 中,方便 Kafka 轻松地获取那些已过期 Bucket,并删除它们。如果定时任务的过期时间无法被涵盖在本层时间轮中,那么,就按需创建上一层时间轮,然后在上一层时间轮上完整地执行刚刚所说的所有逻辑。
                        advanceClock:向前驱动时钟的方法
                          def advanceClock(timeMs: Long): Unit = {
                          // 向前驱动到的时点要超过Bucket的时间范围,才是有意义的推进,否则什么都不做
                          // 更新当前时间currentTime到下一个Bucket的起始时点
                          /**
                          * 假设当前时间轮为第二层时间轮,其tickMs=20MS
                          * currentTime=40MS,timeMs=70MS,此时满足if条件,向前推进
                          * 更新 currentTime = 70 - (70 % 20)= 70 - 10 = 60 MS,也就是 40 MS时间轮下一个轮子的起始时间点
                          */
                          if (timeMs >= currentTime + tickMs) {
                          currentTime = timeMs - (timeMs % tickMs)
                          // 同时尝试为上一层时间轮做向前推进动作
                          if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
                          }
                          }

                              参数 timeMs 表示要把时钟向前推动到这个时点。向前驱动到的时点必须要超过 Bucket 的时间范围,才是有意义的推进,否则什么都不做,毕竟它还在 Bucket 时间范围内。

                              相反,一旦超过了 Bucket 覆盖的时间范围,代码就会更新当前时间 currentTime 到下一个 Bucket 的起始时点,同时递归地为上一层时间轮做向前推进动作。

                              推进时钟的动作是由 Kafka 后台专属的 Reaper 线程发起的。


                          参考:

                          极客时间《Kafka核心源码解读》

                          《深入理解Kafka核心设计与实践原理》

                          文章转载自大数据记事本,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                          评论