暂无图片
暂无图片
1
暂无图片
暂无图片
暂无图片

深入理解Kafka服务端之滚动生成新日志段的流程及条件

大数据记事本 2020-12-10
1924
一、场景分析
    之前提到,Kafka中一个分区对应一个Log对象,每个Log对象下面又划分了多个日志段LogSegment。那么这些日志段的划分策略是什么?即满足什么条件时会生成新的日志段,以及生成新日志段的流程是什么样的。这篇来进行详细的分析。
二、图示说明
滚动生成新的日志段流程:

三、源码分析

    1. 新的日志段是如何生成的。

    由于LogSegment是由Log管理的,所以按理说,控制生成新的LogSegment的方法就应该在Log类中。这个方法就是roll()方法,它的注释如下:

    * Roll the log over to a new active segment starting with the current logEndOffset.
    * This will trim the index to the exact size of the number of entries it currently contains.
    大概翻译就是:
        滚动生成新的active日志段,以当前的LogEndOffset作为日志段的起始偏移量。
        这个方法将裁剪索引文件至实际大小。(具体为何裁剪索引文件可以看《深入理解Kafka服务端之索引文件及mmap内存映射》)
      这里要重点关注一个概念:
        LogEndOffset,简称LEO。是Kafka服务端十分重要的一个概念,后面还会多次提及。
        表示的是下一条待写入消息的偏移量。

      生成新日志段的流程分析:

        def roll(expectedNextOffset: Option[Long] = None): LogSegment = {
        maybeHandleIOException(s"Error while rolling log segment for $topicPartition in dir ${dir.getParent}") {
        //记录开始时间
        val start = time.hiResClockMs()
        lock synchronized {
        //检查索引文件的内存映射是否关闭
        checkIfMemoryMappedBufferClosed()
        //计算新日志段文件的起始偏移量baseOffset
        val newOffset = math.max(expectedNextOffset.getOrElse(0L), logEndOffset)
        //生成新的日志文件
        val logFile = Log.logFile(dir, newOffset)
        //如果newOffset对应的日志段文件已存在
              if (segments.containsKey(newOffset)) {
        //如果active日志段的起始偏移量和上面计算的起始偏移量相同,且active日志段的日志文件大小为0,则删除active日志段
                if (activeSegment.baseOffset == newOffset && activeSegment.size == 0) {
        warn(s"Trying to roll a new log segment with start offset $newOffset " +
        s"=max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already " +
        s"exists and is active with size 0. Size of time index: ${activeSegment.timeIndex.entries}," +
        s" size of offset index: ${activeSegment.offsetIndex.entries}.")
        //异步删除active日志段(先标记为.delete)
        deleteSegment(activeSegment)
        } else {
        //如果日志段文件已存在且不是active,则抛异常
        throw new KafkaException(s"Trying to roll a new log segment for topic partition $topicPartition with start offset $newOffset" +
        s" =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already exists. Existing " +
        s"segment is ${segments.get(newOffset)}.")
        }
        //如果计算出的baseOffset比active日志段的baseOffset还小,则抛异常
        } else if (!segments.isEmpty && newOffset < activeSegment.baseOffset) {
        throw new KafkaException(
        s"Trying to roll a new log segment for topic partition $topicPartition with " +
        s"start offset $newOffset =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) lower than start offset of the active segment $activeSegment")
        } else {
        //代码走这里说明newOffset正常
        val offsetIdxFile = offsetIndexFile(dir, newOffset)//生成index文件名
        val timeIdxFile = timeIndexFile(dir, newOffset)//生成timeindex文件名
        val txnIdxFile = transactionIndexFile(dir, newOffset)//生成txnindex文件名
        //如果文件已存在则删除
        for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if file.exists) {
        warn(s"Newly rolled segment file ${file.getAbsolutePath} already exists; deleting it first")
        Files.delete(file.toPath)
        }


        Option(segments.lastEntry).foreach(_.getValue.onBecomeInactiveSegment())
              }
              
        producerStateManager.updateMapEndOffset(newOffset)
        producerStateManager.takeSnapshot()
        //生成新的日志段对象
        val segment = LogSegment.open(dir,
        baseOffset = newOffset,
        config,
        time = time,
        fileAlreadyExists = false,
        initFileSize = initFileSize,
        preallocate = config.preallocate)
        //将新的日志段对象添加到Map
        addSegment(segment)
        //更新LEO
        updateLogEndOffset(nextOffsetMetadata.messageOffset)
              // 异步刷写旧的日志段
        scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)


        info(s"Rolled new log segment at offset $newOffset in ${time.hiResClockMs() - start} ms.")
        //返回生成的日志段对象
        segment
        }
        }
        }

        a. 检查该Log对象的索引文件内存映射是否关闭

          //检查索引文件的内存映射是否关闭,默认未关闭,如果关闭了则该Log对象就不允许进行磁盘IO操作了
          checkIfMemoryMappedBufferClosed()

          是否关闭由一个boolean类型的变量标记,默认为false

            @volatile private var isMemoryMappedBufferClosed = false

            b. 计算新的日志段文件的起始偏移量baseOffset

              //计算新日志段文件的起始偏移量baseOffset
              val newOffset = math.max(expectedNextOffset.getOrElse(0L), logEndOffset
              这里newOffset取的是调用roll方法传入的Long类型的参数和LEO的较大值

              c. 生成新的日志文件,即.log文件。参数dir就是该Log对象对应的分区在磁盘上的物理路径

                //生成新的日志文件
                val logFile = Log.logFile(dir, newOffset)
                d.验证起始偏移量的合法性
                    d1. 如果起始偏移量newOffset对应的日志段对象已存在:
                • 如果当前active日志段的起始偏移量baseOffset和newOffset一样,且active日志段里面没有保存任何消息,则异步删除active日志段
                • 如果不是active日志段,则抛出异常
                  if (segments.containsKey(newOffset)) {
                  //如果active日志段的起始偏移量和上面计算的起始偏移量相同,且active日志段的日志文件大小为0,则删除active日志段
                    if (activeSegment.baseOffset == newOffset && activeSegment.size == 0) {
                  warn(s"Trying to roll a new log segment with start offset $newOffset " +
                  s"=max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already " +
                  s"exists and is active with size 0. Size of time index: ${activeSegment.timeIndex.entries}," +
                  s" size of offset index: ${activeSegment.offsetIndex.entries}.")
                  //异步删除active日志段(先标记为.delete)
                  deleteSegment(activeSegment)
                  } else {
                  //如果日志段文件已存在且不是active,则抛异常
                  throw new KafkaException(s"Trying to roll a new log segment for topic partition $topicPartition with start offset $newOffset" +
                  s" =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already exists. Existing " +
                  s"segment is ${segments.get(newOffset)}.")
                    }
                  }

                      d2. 如果计算出的newOffset比active日志段的起始偏移量baseOffset还小,则抛异常:

                    else if (!segments.isEmpty && newOffset < activeSegment.baseOffset) {
                    throw new KafkaException(
                    s"Trying to roll a new log segment for topic partition $topicPartition with " +
                    s"start offset $newOffset =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) lower than start offset of the active segment $activeSegment")
                    }

                        d3. 如果newOffset没有问题,则创建日志段对应的各种索引文件。

                      else {
                      //代码走这里说明newOffset正常
                      val offsetIdxFile = offsetIndexFile(dir, newOffset)//生成index文件名
                      val timeIdxFile = timeIndexFile(dir, newOffset)//生成timeindex文件名
                      val txnIdxFile = transactionIndexFile(dir, newOffset)//生成txnindex文件名
                      //如果文件已存在则删除
                      for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if file.exists) {
                      warn(s"Newly rolled segment file ${file.getAbsolutePath} already exists; deleting it first")
                      Files.delete(file.toPath)
                      }


                      Option(segments.lastEntry).foreach(_.getValue.onBecomeInactiveSegment())
                      }

                      e. 生成新的日志段对象

                        //生成新的日志段对象
                        val segment = LogSegment.open(dir,
                        baseOffset = newOffset,
                        config,
                        time = time,
                        fileAlreadyExists = false,
                        initFileSize = initFileSize,
                        preallocate = config.preallocate)

                        f. 将生成的日志段对象添加到集合segments中,这个集合用的就是ConcurrentSkipListMap跳表这种数据结构

                          //将新的日志段对象添加到Map
                          addSegment(segment)

                          g. 更新LEO

                            //更新LEO
                            updateLogEndOffset(nextOffsetMetadata.messageOffset)

                            h. 调度异步程序刷写旧的日志段文件

                              scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)

                              i. 返回新生成日志段对象

                                  2.滚动生成新日志段的条件剖析:

                                  直接查看调用roll方法的地方,在Log.maybeRoll方法中,该方法的作用是:如果有必要,滚动生成一个新的日志段对象返回;否则返回当前 active 日志段对象:

                                private def maybeRoll(messagesSize: Int, appendInfo: LogAppendInfo): LogSegment = {
                                //获取当前active日志段对象
                                val segment = activeSegment
                                //获取当前时间
                                val now = time.milliseconds
                                //消息中的最大时间戳
                                val maxTimestampInMessages = appendInfo.maxTimestamp
                                //消息中的最大偏移量
                                val maxOffsetInMessages = appendInfo.lastOffset
                                //如果需要滚动生成新的日志段对象
                                if (segment.shouldRoll(RollParams(config, appendInfo, messagesSize, now))) {
                                debug(s"Rolling new log segment (log_size = ${segment.size}/${config.segmentSize}}, " +
                                s"offset_index_size = ${segment.offsetIndex.entries}/${segment.offsetIndex.maxEntries}, " +
                                s"time_index_size = ${segment.timeIndex.entries}/${segment.timeIndex.maxEntries}, " +
                                s"inactive_time_ms = ${segment.timeWaitedForRoll(now, maxTimestampInMessages)}/${config.segmentMs - segment.rollJitterMs}).")


                                appendInfo.firstOffset match {
                                case Some(firstOffset) => roll(Some(firstOffset))
                                case None => roll(Some(maxOffsetInMessages - Integer.MAX_VALUE))
                                }
                                } else {
                                segment
                                }
                                }

                                其中判断是否需要滚动生成新的日志段对象调用的是LogSegment.shouldRoll方法:

                                  segment.shouldRoll(RollParams(config, appendInfo, messagesSize, now))

                                  LogSegment.shouldRoll方法:

                                    def shouldRoll(rollParams: RollParams): Boolean = {
                                      //日志段等待滚动的时间 > 日志段保留的最大时间 - 扰动时间 即:如果为true,表示可以滚动了
                                    //maxTimestampInMessages:待写入消息中的最大时间戳
                                      //maxSegmentMs:日志段保留的最大时间,默认168小时,由参数 log.roll.hours 配置
                                    //rollJitterMs:扰动值,避免同一时间生成大量的日志段文件给磁盘带来的压力
                                    val reachedRollMs = timeWaitedForRoll(rollParams.now, rollParams.maxTimestampInMessages) > rollParams.maxSegmentMs - rollJitterMs
                                    //滚动条件:①日志段大小超过maxSegmentBytes(默认1G)
                                    size > rollParams.maxSegmentBytes - rollParams.messagesSize ||
                                    //②日志段不为空,且等待滚动时间超过168h
                                    (size > 0 && reachedRollMs) ||
                                    //③偏移量索引文件或者时间戳索引文件大小超过10M ④最大偏移量和baseOffset的差值超过Int.MaxValue
                                    offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(rollParams.maxOffsetInMessages)//判断是否可以将日志段中最大的偏移量转为相对偏移量
                                    }


                                    a. timeWaitedForRoll方法:计算日志段等待滚动的时间:

                                      def timeWaitedForRoll(now: Long, messageTimestamp: Long) : Long = {
                                      //加载日志段中第一个批次的最大时间戳
                                        loadFirstBatchTimestamp()
                                      rollingBasedTimestamp match {
                                          //如果 rollingBasedTimestamp不为None,则返回当前待写入消息的最大时间戳 - rollingBasedTimestamp 的值
                                      case Some(t) if t >= 0 => messageTimestamp - t
                                      //如果 rollingBasedTimestamp 为None,则返回当前时间 - 日志段的创建时间
                                      case _ => now - created
                                      }
                                      }
                                      其中loadFirstBatchTimestamp方法的作用是:如果变量rollingBasedTimestamp为None,则加载日志段中第一个批次的最大时间戳(如果日志段为空,那么该变量还是None);如果不为None则什么都不做
                                        private def loadFirstBatchTimestamp(): Unit = {
                                        if (rollingBasedTimestamp.isEmpty) {
                                        val iter = log.batches.iterator()
                                        if (iter.hasNext)
                                        rollingBasedTimestamp = Some(iter.next().maxTimestamp)
                                        }
                                        }
                                        这里再看下变量rollingBasedTimestamp如果不为None,是什么时候被赋值的,在LogSegment.append()方法中,其值是第一次写入的消息集合的最大时间戳
                                          //获取日志段追加数据的起始物理地址
                                          val physicalPosition = log.sizeInBytes()
                                          //如果物理地址为0,说明日志段为空,即当前为第一次写入消息
                                          if (physicalPosition == 0)
                                          //更新用于日志段切分的时间戳为当前消息集合的最大时间戳
                                          rollingBasedTimestamp = Some(largestTimestamp)

                                          所以日志段等待滚动的时间的计算逻辑为:

                                          • 如果日志段为空:当前时间 -  日志段的创建时间
                                          • 如果日志段不为空:待写入消息集合的最大时间戳 - 第一次写入消息集合的最大时间戳
                                          b. 如果日志段等待滚动的时间超过了最大的保留时间  - 扰动值,则表示应该滚动生成新日志段了
                                          • rollParams.maxSegmentMs:日志段时间维度的阈值,由服务端参数:log.roll.hours 和 log.roll.ms 配置,log.roll.ms 优先级高。但默认情况下只配置了 log.roll.hours ,为168,即7天。如果等待超过这个时间该日志段还未写满,则滚动生成新的日志段(如果日志段为空则不会滚动)。

                                          • rollJitterMs:扰动值。避免在同一时间生成大量的日志段文件给磁盘IO带来压力

                                            val reachedRollMs = timeWaitedForRoll(rollParams.now, rollParams.maxTimestampInMessages) > rollParams.maxSegmentMs - rollJitterMs

                                            reachedRollMs:Boolean值,标记时间维度是否满足滚动条件。

                                            c. 滚动生成新日志段的条件:

                                                c1. 日志段容量达到阈值:

                                              size > rollParams.maxSegmentBytes - rollParams.messagesSize
                                              =>
                                              size + rollParams.messagesSize > rollParams.maxSegmentBytes
                                              即已经写入的字节 + 待写入消息字节 > 日志段最大容量
                                              • rollParams.maxSegmentBytes:日志段文件的最大容量。由服务端参数:log.segment.bytes 配置,默认为1G。

                                                  c2. 日志段不为空,且等待滚动的时间达到阈值。这里注意:如果日志段为空,即使达到了等待滚动的时间,也不会滚动生成新的日志段
                                                (size > 0 && reachedRollMs)
                                                    c3. 索引文件容量达到阈值
                                                  offsetIndex.isFull || timeIndex.isFull

                                                      索引文件的阈值由服务端参数:log.index.size.max.bytes  配置,默认为10M。

                                                      c4. 待写入消息的最大偏移量 - 日志段的起始偏移量baseOffset 超过Int.MaxValue,无法转为相对偏移量:
                                                    !canConvertToRelativeOffset(rollParams.maxOffsetInMessages)


                                                    总结:
                                                        日志段滚动的条件有4个:
                                                    • 日志段容量超过阈值:默认为1G
                                                    • 日志段不为空且等待滚动的时间超过阈值:默认为7天
                                                    • 日志段索引文件(包括偏移量索引文件和时间戳索引文件)容量超过阈值:默认为10M
                                                    • 待写入消息集合的最大偏移量 - 日志段起始偏移量 > Int.MaxValue,此时无法转为相对偏移量
                                                    文章转载自大数据记事本,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                    评论