暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

深入理解Kafka服务端之副本管理器介绍及Leader副本和Follower副本的切换

大数据记事本 2020-12-16
351
一、场景分析
    ReplicaManager:副本管理器。在之前多次提到这个组件,比如处理生产者的写数据请求、处理 Follower副本或者消费者拉取数据的请求等都是通过 ReplicaManager 的方法进行处理的。在上一篇分析 ISR 列表伸缩时,ISR 列表相关的定时任务也是在 ReplicaManager 中启动的,可见其重要性。
    从名字来看,这个组件是用来管理节点上所有副本的,但其实它并非直接对副本进行管理,而是直接管理了分区对象,再由分区对象间接管理了分区下的副本。

    在 Kafka 的运行过程中,难免会发生分区 Leader 副本不可用的现象,这时就需要在 ISR 列表中选择一个 Follower 副本切换为 Leader 副本对外提供服务,那么 Follower 副本和 Leader 副本之间的切换就是副本管理器进行管理的,具体如何进行切换,这篇进行详细说明。

二、图示说明
becomeLeaderOrFollower() 方法流程说明:

三、源码分析

1.看一下ReplicaManager.scala 文件的结构 ,其中定义的类和对象如下:
  • ReplicaManager 类:它是副本管理器的具体实现代码,里面定义了读写副本、删除副本消息的方法以及其他管理方法。
  • ReplicaManager 对象:ReplicaManager 类的伴生对象,定义了 几个常量。
  • FetchPartitionData类:定义获取到的分区数据以及重要元数据信息,如高水位值、Log Start Offset 值等。
  • LogReadResult 类:表示副本管理器从副本本地日志读取到的消息数据以及相关元数据信息,如高水位值、Log Start Offset 值等。
  • LogReadResult 对象:LogReadResult 类的伴生对象,只定义了一个UnknownLogReadResult常量
  • LogDeleteRecordsResult类:表示副本管理器执行副本日志删除操作后返回的结果信息。
  • LogAppendResult类:表示副本管理器执行副本日志写入操作后返回的结果信息。
其中,ReplicaManager 类的定义如下:
    class ReplicaManager(val config: KafkaConfig,//配置管理类
    metrics: Metrics,//监控指标类
    time: Time,
    val zkClient: KafkaZkClient,//zk客户端对象
    scheduler: Scheduler,//调度器
    val logManager: LogManager,//日志管理器
    val isShuttingDown: AtomicBoolean,//是否已经关闭
    quotaManagers: QuotaManagers,//配额管理器
    val brokerTopicStats: BrokerTopicStats,
    val metadataCache: MetadataCache,//Broker元数据缓存
    logDirFailureChannel: LogDirFailureChannel,//失效日志路径的处理类
    // 处理延时PRODUCE生产请求的Purgatory
    val delayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce],
    // 处理延时FETCH拉取请求的Purgatory
    val delayedFetchPurgatory: DelayedOperationPurgatory[DelayedFetch],
    // 处理延时DELETE_RECORDS删除请求的Purgatory
    val delayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords],
    // 处理延时ELECT_LEADERS leader选举请求的Purgatory
    val delayedElectPreferredLeaderPurgatory: DelayedOperationPurgatory[DelayedElectPreferredLeader],
    threadNamePrefix: Option[String]) extends Logging with KafkaMetricsGroup {
    ...
    }
    ReplicaManager类中关键属性的含义:
    • logManager:日志管理器。它负责创建和管理分区的日志对象,里面定义了很多操作日志对象的方法,如 getOrCreateLog 等。
    • metadataCache:Broker 端的元数据缓存。保存集群上分区的 Leader、ISR 等信息。每台 Broker 上的元数据缓存,是从 Controller 端的元数据缓存异步同步过来的。
    • 四个Purgatory相关的属性:这 4 个属性是 delayedProducePurgatory、delayedFetchPurgatory、delayedDeleteRecordsPurgatory 和 delayedElectLeaderPurgatory,它们分别管理 4 类延时请求:对应分别存放延时生产者请求、延时消费者请求、延时消息删除请求和延时 Leader 选举请求。

    在副本管理过程中,状态的变更大多都会引发对延时请求的处理。

    ReplicaManager类中定义的几个关键变量:

    • controllerEpoch:控制器的纪元值,用于隔离控制器的请求,如果控制器请求携带的epoch值小于该值,说明是过期的controller发送的请求,不予处理
    • localBrokerId:当前broker节点的id值
    • allPartitions:保存当前broker上所有分区对象的对象池。ReplicaManager对于分区的管理主要就是通过分区对象Partition实现的
    • replicaFetcherManager:用于创建ReplicaFetcherThread线程,比如其关键的createFetcherThread() 方法:
      override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): ReplicaFetcherThread = {
      val prefix = threadNamePrefix.map(tp => s"$tp:").getOrElse("")
      val threadName = s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
      //创建ReplicaFetcherThread线程实例对象并返回
      new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, brokerConfig, failedPartitions, replicaManager,
      metrics, time, quotaManager)
      }

      2.ReplicaManager如何进行数据的读写

          之前在分析 ProduceRequest 和 FetchRequest 时已经分析过 ReplicaManager 的写数据和读数据的方法,这里主要看一下 ReplicaManager 是如何将对应的延时请求加入到Purgatory中的
          副本写入数据的方法:appendRecords()

          Kafka 中需要进行副本写入的场景:

      • 场景一:生产者向 Leader 副本写入消息;

      • 场景二:Follower 副本拉取消息后写入自身存储;

      • 场景三:消费者组写入组信息;

      • 场景四:事务管理器写入事务信息(包括事务标记、事务元数据等)。

          除了场景二是直接调用的 Partition 的方法实现,其他场景都是调用 ReplicaManager.appendRecords 实现的

      这里不详细分析 appendRecords() 方法,只需要知道:

      • 将数据追加到本地日志对应的方法是:appendToLocalLog()

      • 判断是否需要等待其他副本写入的方法是:delayedProduceRequestRequired()

        private def delayedProduceRequestRequired(requiredAcks: Short,
        entriesPerPartition: Map[TopicPartition, MemoryRecords],
        localProduceResults: Map[TopicPartition, LogAppendResult]): Boolean = {
        requiredAcks == -1 &&
        entriesPerPartition.nonEmpty &&
        localProduceResults.values.count(_.exception.isDefined) < entriesPerPartition.size
        }
            该方法返回true时需要等待其他副本写入,否则不需要。而判断是否需要等待其它副本写入的条件有三个:
        • requiredAcks 必须等于 -1
        • 依然有数据尚未写完
        • 至少有一个分区的消息已经成功地被写入到本地日志。
            如果需要等待其他副本写入,说明有部分分区写入未完成,那么会将本次写入的请求交给管理延时生产请求的 delayedProducePurgatory,进行对应的延时处理
          val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
          //创建DelayedProduce延时请求对象
          val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)
          val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
          /再一次尝试完成该延时请求
          //如果暂时无法完成,则将对象放入到相应的Purgatory中等待后续处理
          delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)

              副本读取数据的方法:fetchMessages()

            def fetchMessages(timeout: Long,//请求处理的超时时间
            replicaId: Int,//副本ID。对于消费者来说,这个值为-1,如果是Follower副本,该值就是Follower副本所在节点的Broker ID
            fetchMinBytes: Int,//能够获取的最小字节数
            fetchMaxBytes: Int,//能够获取的最大字节数
            hardMaxBytesLimit: Boolean,//是否能超过最大字节数限制,如果为True,则表示返回的数据字节数绝对不允许超过最大字节数
            fetchInfos: Seq[(TopicPartition, PartitionData)],//读取分区的信息,比如读哪些分区,从分区的哪个偏移量开始读取,最多读取多少字节等
            quota: ReplicaQuota = UnboundedQuota,//配额控制,判断是否需要在读取过程中做限速控制
            responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,//回调逻辑
            isolationLevel: IsolationLevel) {
              ...
            if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) {
            //如果满足一个条件,则返回
            val fetchPartitionData = logReadResults.map { case (tp, result) =>
            tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records,
            result.lastStableOffset, result.info.abortedTransactions)
            }
            //调用回调函数
            responseCallback(fetchPartitionData)


            //如果不满足上面4个条件,那么拉取请求会进行等待
            } else {


            val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicPartition, FetchPartitionStatus)]
            fetchInfos.foreach { case (topicPartition, partitionData) =>
            logReadResultMap.get(topicPartition).foreach(logReadResult => {
            val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata
            fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData))
            })
            }
            val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader,
            fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus)
            //构建DelayedFetch延时请求对象
            val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, responseCallback)


            val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey(tp) }


            //再次尝试完成请求,如果依然不能完成,则交由Purgatory等待后续处理
            delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
              }
            }
            如果拉取数据的过程中满足以下四个条件之一,便会立即返回响应:
            • 没有设置超时时间,说明请求发送方希望请求被处理后立即返回
            • 拉取请求中没有携带任何要拉取分区的信息
            • 已经拉取到了足够多的数据
            • 拉取过程中出错

                如果不满足任何一个条件,那么就会将本次的拉取请求交给管理延时拉取请求的delayedFetchPurgatory,进行对应的延时处理

            3.ReplicaManager对分区及副本的管理
                上面提到,ReplicaManager 定义了一个 allPartitions 变量,将当前 broker 节点下的所有分区对象保存在对象池中。每个分区对象下面又管理了一组 Replica 副本对象。通过这样的层级关系,副本管理器实现了对分区对象的直接管理和对副本对象的间接管理。可以理解为:ReplicaManager 通过直接操作分区对象来间接管理下属的副本对象。
            4.ReplicaManager如何进行Leader和Follower副本的切换

                Leader 副本和 Follower 副本的角色并不是固定不变的,如果 Leader 所在的节点发生宕机,那么之前的 Follower 副本可能会被选为 Leader 副本,这些变更是通过 Controller 给 broker 发送 LeaderAndIsrRequest 请求来实现的。而 ReplicaManager 处理 LeaderAndIsrRequest  请求的方法为 becomeLeaderOrFollower() :

              def becomeLeaderOrFollower(correlationId: Int,
              leaderAndIsrRequest: LeaderAndIsrRequest,
              onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = {
              ...
              replicaStateChangeLock synchronized {
              //如果leaderAndIsrRequest请求中携带的Controller Epoch小于当前节点的controllerEpoch,说明是过期Controller发送的请求
              if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) {
              stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller ${leaderAndIsrRequest.controllerId} with " +
              s"correlation id $correlationId since its controller epoch ${leaderAndIsrRequest.controllerEpoch} is old. " +
              s"Latest known controller epoch is $controllerEpoch")
              //抛出响应的异常
              leaderAndIsrRequest.getErrorResponse(0, Errors.STALE_CONTROLLER_EPOCH.exception)
              } else {
              //初始化一个结果集,保存分区和对应处理结果
              val responseMap = new mutable.HashMap[TopicPartition, Errors]
              //获取控制器节点的BrokerId
              val controllerId = leaderAndIsrRequest.controllerId
              //更新当前的Controller Epoch 值
              controllerEpoch = leaderAndIsrRequest.controllerEpoch


              //定义存储分区信息的集合
              val partitionState = new mutable.HashMap[Partition, LeaderAndIsrRequest.PartitionState]()
              val updatedPartitions = new mutable.HashSet[Partition]
              // 遍历LeaderAndIsrRequest请求中的所有分区状态,进行Leader Epoch校验
              leaderAndIsrRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
              // 从allPartitions中获取对应分区对象,如果没有则创建
              val partition = getOrCreatePartition(topicPartition)
              //获取当前分区对象的Leader Epoch
              val currentLeaderEpoch = partition.getLeaderEpoch
              //获取请求中携带的Leader Epoch
              val requestLeaderEpoch = stateInfo.basePartitionState.leaderEpoch
              //如果是离线分区,将该分区放入结果集,并封装一个KAFKA_STORAGE_ERROR错误
              if (partition eq ReplicaManager.OfflinePartition) {
              stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
              s"controller $controllerId with correlation id $correlationId " +
              s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
              "partition is in an offline log directory")
              responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
              //如果请求中的Leader Epoch > 当前分区的Leader Epoch
              } else if (requestLeaderEpoch > currentLeaderEpoch) {
              //且分区的初始副本分配中包含当前节点
              if (stateInfo.basePartitionState.replicas.contains(localBrokerId)) {
              //将分区对象放入updatedPartitions集合
              updatedPartitions.add(partition)
              //将分区状态放入partitionState集合
              partitionState.put(partition, stateInfo)
              } else {
              stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
              s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " +
              s"in assigned replica list ${stateInfo.basePartitionState.replicas.asScala.mkString(",")}")
              responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
              }
              //如果请求中的Leader Epoch < 当前分区的Leader Epoch,说明是过期leader的请求
              } else if (requestLeaderEpoch < currentLeaderEpoch) {
              stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
              s"controller $controllerId with correlation id $correlationId " +
              s"epoch $controllerEpoch for partition $topicPartition since its associated " +
              s"leader epoch $requestLeaderEpoch is smaller than the current " +
              s"leader epoch $currentLeaderEpoch")
              //将该分区放入结果集,并封装一个STALE_CONTROLLER_EPOCH错误
              responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
              } else {
              stateChangeLogger.debug(s"Ignoring LeaderAndIsr request from " +
              s"controller $controllerId with correlation id $correlationId " +
              s"epoch $controllerEpoch for partition $topicPartition since its associated " +
              s"leader epoch $requestLeaderEpoch matches the current leader epoch")
              //将该分区放入结果集,并封装一个STALE_CONTROLLER_EPOCH错误
              responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
              }
              }
              // 确定Broker上副本是哪些分区的Leader副本
              val partitionsTobeLeader = partitionState.filter { case (_, stateInfo) =>
              stateInfo.basePartitionState.leader == localBrokerId
              }
              // 确定Broker上副本是哪些分区的Follower副本
              val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys


              //为partitionsTobeLeader集合中的所有分区在本机的副本执行成为Leader的操作,调用makeLeaders方法
              val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty)
              makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap)
              else
              Set.empty[Partition]
              //为partitionsToBeFollower集合中的所有分区在本机的副本执行成为Follower的操作,调用makeFollowers方法
              val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
              makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap)
              else
              Set.empty[Partition]


              //遍历请求中的所有分区
              leaderAndIsrRequest.partitionStates.asScala.keys.foreach { topicPartition =>
              /*
              * 如果指定了一个离线的分区路径,那么通过getOrCreatePartition方法创建分区的本地副本抛出异常之前,getOrCreatePartition方法
              * 可能已经创建了一个对应的分区对象了,这时在allPartitions对象池中,就会添加一个Partition对象为空的TopicPartition,
              * 这里将这个空的Partition对象修改为OfflinePartition
              */
              if (localReplica(topicPartition).isEmpty && (allPartitions.get(topicPartition) ne ReplicaManager.OfflinePartition))
              //如果当前节点没有指定分区的本地副本,且该分区的状态为离线状态,则更新该分区为离线分区
              allPartitions.put(topicPartition, ReplicaManager.OfflinePartition)
              }


              if (!hwThreadInitialized) {
              // 启动高水位检查点专属线程
              // 定期将Broker上所有非Offline分区的高水位值写入到检查点文件,默认每5秒执行1次
              startHighWaterMarksCheckPointThread()
              hwThreadInitialized = true
              }


              val futureReplicasAndInitialOffset = new mutable.HashMap[TopicPartition, InitialFetchState]
              //遍历前面Leader Epoch校验正常的分区,看是否需要做副本迁移
              for (partition <- updatedPartitions) {
              val topicPartition = partition.topicPartition
              //如果已经定义了指定分区对应的Future日志对象
              // 这里用于做副本迁移,当用户希望将副本从一个目录迁移到另一个目录时,会将迁移后的目录定义为-future后缀的目录
              //然后在该目录下创建future日志,当future日志追上current日志时,来替换当前分区的日志
              if (logManager.getLog(topicPartition, isFuture = true).isDefined) {
              partition.localReplica.foreach { replica =>
              val leader = BrokerEndPoint(config.brokerId, "localhost", -1)


              partition.getOrCreateReplica(Request.FutureLocalReplicaId, isNew = false)


              logManager.abortAndPauseCleaning(topicPartition)


              futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(leader,
              partition.getLeaderEpoch, replica.highWatermark.messageOffset))
              }
              }
              }
              //副本修改日志路径管理器拉取指定分区的数据到future副本
              replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset)
              // 关闭空闲副本拉取线程
              replicaFetcherManager.shutdownIdleFetcherThreads()
              // 关闭空闲日志路径数据迁移线程
              replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
              // 执行Leader变更之后的回调逻辑
              onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
              //封装LeaderAndIsrResponse响应
              new LeaderAndIsrResponse(Errors.NONE, responseMap.asJava)
              }
              }
              }

              这个方法代码相对较多,可以分成三个部分来看

              1. 进行Controller Epoch校验,遍历请求中的分区,创建对应的Partition分区对象,进行Leader Epoch校验

              2. 执行让当前Broker上的本地副本成为Leader副本或者Follower副本的逻辑

              3. 进行扫尾工作并封装LeaderAndIsrResponse响应返回

              第一部分:

              a. 进行Controller Epoch校验,如果请求中的 Controller Epoch小于当前节点保存的Controller Epoch,说明是过期的 Controller 发送的请求,直接抛出异常
                if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) {
                ...
                //抛出响应的异常
                leaderAndIsrRequest.getErrorResponse(0, Errors.STALE_CONTROLLER_EPOCH.exception)
                }
                b. 初始化结果集,用于保存分区和对应的处理结果
                  //初始化一个结果集,保存分区和对应处理结果
                  val responseMap = new mutable.HashMap[TopicPartition, Errors]
                  c. 从请求中获取控制器节点id,并更新当前节点的Controller Epoch
                    //获取控制器节点的BrokerId
                    val controllerId = leaderAndIsrRequest.controllerId
                    //更新当前的Controller Epoch 值
                    controllerEpoch = leaderAndIsrRequest.controllerEpoch
                    d. 定义存储分区->分区状态信息的集合 和 存储Leader Epoch校验通过的分区的集合
                      //定义存储分区信息的集合
                      val partitionState = new mutable.HashMap[Partition, LeaderAndIsrRequest.PartitionState]()
                      //定义Leader Epoch校验通过的分区集合
                      val updatedPartitions = new mutable.HashSet[Partition]
                      e. 遍历分区状态,进行Leader Epoch的校验。只有请求中携带的Leader Epoch 大于当前节点上保存的 Leader Epoch 的分区才可以通过校验,之后将分区对象和对应的分区状态信息存储到上面创建的两个集合中。对于校验未通过的分区,封装对应的错误信息并放入结果集中
                        // 遍历LeaderAndIsrRequest请求中的所有分区状态,进行Leader Epoch校验
                        leaderAndIsrRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
                        // 从allPartitions中获取对应分区对象,如果没有则创建
                        val partition = getOrCreatePartition(topicPartition)
                        //获取当前分区对象的Leader Epoch
                        val currentLeaderEpoch = partition.getLeaderEpoch
                        //获取请求中携带的Leader Epoch
                        val requestLeaderEpoch = stateInfo.basePartitionState.leaderEpoch
                        //如果是离线分区,将该分区放入结果集,并封装一个KAFKA_STORAGE_ERROR错误
                        if (partition eq ReplicaManager.OfflinePartition) {
                        ...
                        responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
                        //如果请求中的Leader Epoch > 当前分区的Leader Epoch
                        } else if (requestLeaderEpoch > currentLeaderEpoch) {
                        //且分区的初始副本分配中包含当前节点
                        if (stateInfo.basePartitionState.replicas.contains(localBrokerId)) {
                        //将分区对象放入updatedPartitions集合
                        updatedPartitions.add(partition)
                        //将分区状态放入partitionState集合
                        partitionState.put(partition, stateInfo)
                        } else {
                        ...
                        responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
                        }
                        //如果请求中的Leader Epoch < 当前分区的Leader Epoch,说明是过期leader的请求
                        } else if (requestLeaderEpoch < currentLeaderEpoch) {
                        ...
                        //将该分区放入结果集,并封装一个STALE_CONTROLLER_EPOCH错误
                        responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
                        } else {
                        ...
                        //将该分区放入结果集,并封装一个STALE_CONTROLLER_EPOCH错误
                        responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
                        }
                        }

                        第二部分:

                        a. 确定Broker上的本地副本是哪些分区的Leader副本,判断条件:请求分区信息中Leader副本的节点id = 当前broker节点id

                          // 确定Broker上的本地副本是哪些分区的Leader副本
                          val partitionsTobeLeader = partitionState.filter { case (_, stateInfo) =>
                          stateInfo.basePartitionState.leader == localBrokerId
                          }
                          b. 确定Broker上的本地副本是哪些分区的Follower副本。判断条件:校验通过的分区集合 - 成为Leader副本的分区
                            // 确定Broker上的本地副本是哪些分区的Follower副本
                            val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys
                            c. 为partitionsTobeLeader集合中的所有分区在本机的副本执行成为Leader的操作,调用makeLeaders方法
                              //为partitionsTobeLeader集合中的所有分区在本机的副本执行成为Leader的操作,调用makeLeaders方法
                              val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty)
                              makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap)
                              else
                              Set.empty[Partition]
                              d. 为partitionsToBeFollower集合中的所有分区在本机的副本执行成为Follower的操作,调用makeFollowers方法
                                //为partitionsToBeFollower集合中的所有分区在本机的副本执行成为Follower的操作,调用makeFollowers方法
                                val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
                                makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap)
                                else
                                Set.empty[Partition]

                                下面重点分析一下 makeLeaders 方法和 makeFollowers 方法

                                makeLeaders 方法:
                                  private def makeLeaders(controllerId: Int,//控制器所在节点ID
                                  epoch: Int,//控制器纪元值
                                  partitionState: Map[Partition, LeaderAndIsrRequest.PartitionState],//要成为Leader副本的分区信息集合
                                  correlationId: Int,//
                                  responseMap: mutable.Map[TopicPartition, Errors]//按照分区分组的错误信息
                                  ): Set[Partition] = {
                                  partitionState.keys.foreach { partition =>
                                  ...
                                  }
                                  //遍历要成为Leader副本的分区信息集合
                                  for (partition <- partitionState.keys)
                                  //初始化分区结果,给每个分区添加Errors.NONE信息
                                  responseMap.put(partition.topicPartition, Errors.NONE)
                                  //定义一个集合用于存储本地副本成为Leader副本的分区对象
                                  val partitionsToMakeLeaders = mutable.Set[Partition]()


                                  try {
                                  //停止本地副本请要成为Leader副本的所有分区的数据拉取
                                  replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(_.topicPartition))
                                  //遍历更新所有要成为Leader副本的分区的分区信息
                                  partitionState.foreach{ case (partition, partitionStateInfo) =>
                                  try {
                                  //判断指定分区在本地的副本之前是否为Leader副本,如果之前就是Leader副本,makeLeader方法返回false,否则返回true,执行分支内容
                                  if (partition.makeLeader(controllerId, partitionStateInfo, correlationId)) {
                                  //将该副本对象放入集合
                                  partitionsToMakeLeaders += partition
                                  ...
                                  } else
                                  ...
                                  } catch {
                                  case e: KafkaStorageException =>
                                  ...
                                  val dirOpt = getLogDir(partition.topicPartition)
                                  error(s"Error while making broker the leader for partition $partition in dir $dirOpt", e)
                                  // 把KAFKA_SOTRAGE_ERRROR异常封装到Response中
                                  responseMap.put(partition.topicPartition, Errors.KAFKA_STORAGE_ERROR)
                                  }
                                  }


                                  } catch {
                                  case e: Throwable =>
                                  ...
                                  throw e
                                  }


                                  partitionState.keys.foreach { partition =>
                                  ...
                                  }


                                  partitionsToMakeLeaders
                                  }
                                  这个方法的逻辑是:
                                  1. 给每个分区封装一个没有错误的结果放入结果集

                                  2. 定义一个集合用于存储本地副本成为 Leader 副本的分区对象

                                  3. 停止本地副本请要成为 Leader 副本的所有分区的数据拉取

                                  4. 遍历要成为 Leader 副本的分区集合,判断这个分区的本地副本是否之前就是 Leader 副本,如果不是,将该分区对象放入步骤 2 创建的集合

                                  5. 如果过程中发生异常,根据异常信息更新结果集中对应分区的结果

                                      其中判断给定分区的本地副本是否之前就是 Leader 副本是通过 Partition.makeLeader() 方法实现的。如果之前就是 Leader 副本,返回false;如果不在,返回true。makeLeader() 方法的逻辑如下:

                                    def makeLeader(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = {
                                    val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
                                    //获取控制器发来的分区副本所在节点的集合
                                    val newAssignedReplicas = partitionStateInfo.basePartitionState.replicas.asScala.map(_.toInt)
                                    //更新controller epoch
                                    controllerEpoch = partitionStateInfo.basePartitionState.controllerEpoch
                                    // add replicas that are new
                                    //根据控制器发来的ISR列表获取对应的副本对象集合
                                    val newInSyncReplicas = partitionStateInfo.basePartitionState.isr.asScala.map(r => getOrCreateReplica(r, partitionStateInfo.isNew)).toSet
                                    //移除本地保存的不在控制器发来的ISR列表中的副本对象
                                    (assignedReplicas.map(_.brokerId) -- newAssignedReplicas).foreach(removeReplica)
                                    //更新本地 ISR 列表
                                    inSyncReplicas = newInSyncReplicas


                                    newAssignedReplicas.foreach(id => getOrCreateReplica(id, partitionStateInfo.isNew))
                                    //获取本地副本对象
                                    val leaderReplica = localReplicaOrException
                                    //将本地副本对象的LEO作为新的Leader Epoch的StartOffset
                                    val leaderEpochStartOffset = leaderReplica.logEndOffset
                                    info(s"$topicPartition starts at Leader Epoch ${partitionStateInfo.basePartitionState.leaderEpoch} from " +
                                    s"offset $leaderEpochStartOffset. Previous Leader Epoch was: $leaderEpoch")


                                    //更新Leader Epoch
                                    leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
                                    //更新Leader Epoch 对应的startOffset
                                    leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
                                    //更新zkVersion
                                    zkVersion = partitionStateInfo.basePartitionState.zkVersion


                                    // 尝试更新Leader Epoch和对应的StartOffset
                                    leaderReplica.log.foreach { log =>
                                    log.maybeAssignEpochStartOffset(leaderEpoch, leaderEpochStartOffset)
                                    }
                                    //标记将要称为Leader的副本是否是新转换成Leader副本的
                                    //如果leaderReplicaIdOpt为当前节点id,说明不是新转换成的,该分区之前的Leader副本就在当前节点上
                                    val isNewLeader = !leaderReplicaIdOpt.contains(localBrokerId)
                                    //获取当前Leader副本的LEO
                                    val curLeaderLogEndOffset = leaderReplica.logEndOffset
                                    val curTimeMs = time.milliseconds
                                    //遍历当前节点保存的所有的Follower副本对象
                                    (assignedReplicas - leaderReplica).foreach { replica =>


                                    val lastCaughtUpTimeMs = if (inSyncReplicas.contains(replica)) curTimeMs else 0L
                                    //更新所有Follower副本对象的_lastCaughtUpTimeMs/lastFetchLeaderLogEndOffset/lastFetchTimeMs
                                    replica.resetLastCaughtUpTime(curLeaderLogEndOffset, curTimeMs, lastCaughtUpTimeMs)
                                    }


                                    if (isNewLeader) {
                                    //定义Leader副本的HW
                                    leaderReplica.convertHWToLocalOffsetMetadata()
                                    //更新Leader副本的id为当前broker的id
                                    leaderReplicaIdOpt = Some(localBrokerId)
                                    //更新本地存储的所有Follower副本的LEO
                                    assignedReplicas.filter(_.brokerId != localBrokerId).foreach(_.updateLogReadResult(LogReadResult.UnknownLogReadResult))
                                    }
                                    (maybeIncrementLeaderHW(leaderReplica), isNewLeader)
                                    }
                                    //如果HW修改了,尝试执行之前延迟处理的请求
                                    if (leaderHWIncremented)
                                    tryCompleteDelayedRequests()
                                    isNewLeader
                                    }
                                    ①获取控制器请求中该分区的多个副本所在的节点id集合
                                      val newAssignedReplicas = partitionStateInfo.basePartitionState.replicas.asScala.map(_.toInt)
                                      ②更新 controller epoch
                                        controllerEpoch = partitionStateInfo.basePartitionState.controllerEpoch

                                        ③根据控制器发来的 ISR 列表获取对应的副本对象集合。Partition对象中保存了该分区的所有副本对象,本地副本为 localReplica ,其他节点的副本为 remoteReplica,其中只有本地副本存在对应的日志文件

                                          val newInSyncReplicas = partitionStateInfo.basePartitionState.isr.asScala.map(r => getOrCreateReplica(r, partitionStateInfo.isNew)).toSet

                                          ④移除本地保存的不在控制器发来的ISR列表中的副本对象。假设该分区有3个副本,原来保存在[0,1,2]三个节点上,如果控制器请求中ISR列表变成了[1,2,3],则需要将原来节点0对应的副本对象移除

                                            (assignedReplicas.map(_.brokerId) -- newAssignedReplicas).foreach(removeReplica)
                                            ⑤更新本地ISR列表
                                              inSyncReplicas = newInSyncReplicas

                                              ⑥根据控制器提供的副本所在节点创建对应的副本对象。这个是为了创建那些不在ISR列表中的副本对象,比如分区有3个副本,但控制器发送请求时只有2个在ISR列表,那个不在ISR列表中的副本对象也需要被创建

                                                newAssignedReplicas.foreach(id => getOrCreateReplica(id, partitionStateInfo.isNew))
                                                ⑦获取本地副本对象。将本地副本对象的LEO值作为新LeaderEpoch的StartOffset值
                                                  //获取本地副本对象
                                                  val leaderReplica = localReplicaOrException
                                                  //将本地副本对象的LEO作为新的Leader Epoch的StartOffset
                                                  val leaderEpochStartOffset = leaderReplica.logEndOffset
                                                  ⑧更新Leader Epoch、StartOffset等信息
                                                    //更新Leader Epoch
                                                    leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
                                                    //更新Leader Epoch 对应的startOffset
                                                    leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
                                                    //更新zkVersion
                                                    zkVersion = partitionStateInfo.basePartitionState.zkVersion


                                                    // 尝试更新Log日志对象的Leader Epoch和对应的StartOffset
                                                    leaderReplica.log.foreach { log =>
                                                    log.maybeAssignEpochStartOffset(leaderEpoch, leaderEpochStartOffset)
                                                    }
                                                    ⑨标记该分区的Leader副本之前是不是在当前节点上。判断依据:分区的leaderReplicaIdOpt变量保存的Leader副本所在节点id =  当前节点id
                                                        这个标记值也就是makeLeader() 方法最后的返回值
                                                      //标记将要称为Leader的副本是否是新转换成Leader副本的
                                                      //如果leaderReplicaIdOpt为当前节点id,说明不是新转换成的,该分区之前的Leader副本就在当前节点上
                                                      val isNewLeader = !leaderReplicaIdOpt.contains(localBrokerId)
                                                      ⑩更新分区对象存储的所有Follower副本的关键变量信息,用于后续ISR列表的伸缩
                                                        //获取当前Leader副本的LEO
                                                        val curLeaderLogEndOffset = leaderReplica.logEndOffset
                                                        val curTimeMs = time.milliseconds
                                                        //遍历当前节点保存的所有的Follower副本对象
                                                        (assignedReplicas - leaderReplica).foreach { replica =>


                                                        val lastCaughtUpTimeMs = if (inSyncReplicas.contains(replica)) curTimeMs else 0L
                                                        //更新所有Follower副本对象的_lastCaughtUpTimeMs/lastFetchLeaderLogEndOffset/lastFetchTimeMs
                                                        replica.resetLastCaughtUpTime(curLeaderLogEndOffset, curTimeMs, lastCaughtUpTimeMs)
                                                        }

                                                        ⑾如果分区的Leader副本之前不在当前节点,则定义Leader副本的HW,并更新本地存储的所有Follower副本对象的LEO值

                                                          if (isNewLeader) {
                                                          //定义Leader副本的HW
                                                          leaderReplica.convertHWToLocalOffsetMetadata()
                                                          // mark local replica as the leader after converting hw
                                                          //更新Leader副本的id为当前broker的id
                                                          leaderReplicaIdOpt = Some(localBrokerId)
                                                          // reset log end offset for remote replicas
                                                          //更新本地存储的所有Follower副本的LEO
                                                          assignedReplicas.filter(_.brokerId != localBrokerId).foreach(_.updateLogReadResult(LogReadResult.UnknownLogReadResult))
                                                          }

                                                          ⑿如果提高了HW值,尝试处理之前未处理的延时请求

                                                            if (leaderHWIncremented)
                                                            tryCompleteDelayedRequests()

                                                            ⒀返回标记值

                                                            makeFollowers 方法:
                                                              private def makeFollowers(controllerId: Int,
                                                              epoch: Int,
                                                              partitionStates: Map[Partition, LeaderAndIsrRequest.PartitionState],
                                                              correlationId: Int,
                                                              responseMap: mutable.Map[TopicPartition, Errors]) : Set[Partition] = {
                                                              partitionStates.foreach { case (partition, partitionState) =>
                                                              ...
                                                              }
                                                              //遍历所有分区,将分区的异常类型初始化为Errors.NONE
                                                              for (partition <- partitionStates.keys)
                                                              responseMap.put(partition.topicPartition, Errors.NONE)
                                                              //定义一个集合,用于存放在当前节点上本地副本变更为Follower副本的分区
                                                              val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()


                                                              try {
                                                              partitionStates.foreach { case (partition, partitionStateInfo) =>
                                                              //获取控制器发来的分区Leader副本的brokerId
                                                              val newLeaderBrokerId = partitionStateInfo.basePartitionState.leader
                                                              try {
                                                                        metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
                                                              //在元数据缓存中找到Leader副本的节点信息
                                                              case Some(_) =>
                                                              //如果给定分区的本地副本是转换成Follower副本的,makeFollower返回true,并执行一些列操作
                                                              if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))
                                                              //将副本对象添加到本地副本新转换成Follower副本的集合
                                                              partitionsToMakeFollower += partition
                                                              else
                                                                             ...
                                                              //如果元数据缓存中没有指定Leader副本所在节点的信息,说明Leader副本所在的节点不可用
                                                              case None =>
                                                              ...
                                                              //创建一个本地副本对象,初始化日志对象
                                                              partition.getOrCreateReplica(localBrokerId, isNew = partitionStateInfo.isNew)
                                                              }
                                                              } catch {
                                                              case e: KafkaStorageException =>
                                                              ...
                                                              val dirOpt = getLogDir(partition.topicPartition)
                                                              ...
                                                              responseMap.put(partition.topicPartition, Errors.KAFKA_STORAGE_ERROR)
                                                              }
                                                              }
                                                              //关闭现有的Fetcher线程
                                                              replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition))
                                                              partitionsToMakeFollower.foreach { partition =>
                                                              ...
                                                              }
                                                              //遍历本地副本成为Follower副本的分区,尝试完成延迟请求
                                                              partitionsToMakeFollower.foreach { partition =>
                                                              val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topicPartition)
                                                              tryCompleteDelayedProduce(topicPartitionOperationKey)
                                                              tryCompleteDelayedFetch(topicPartitionOperationKey)
                                                              }


                                                              partitionsToMakeFollower.foreach { partition =>
                                                              ...
                                                              }


                                                              if (isShuttingDown.get()) {
                                                              partitionsToMakeFollower.foreach { partition =>
                                                              ...
                                                              }
                                                              }
                                                              else {
                                                              val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map { partition =>
                                                              //找到分区Leader副本所在节点id
                                                              val leader = metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get
                                                              .brokerEndPoint(config.interBrokerListenerName)
                                                              //获取要拉取数据的起始偏移量
                                                              val fetchOffset = partition.localReplicaOrException.highWatermark.messageOffset
                                                              partition.topicPartition -> InitialFetchState(leader, partition.getLeaderEpoch, fetchOffset)
                                                              }.toMap
                                                              //TODO 添加新的拉取线程,让Follower Replica 去Leader Replica 拉取数据
                                                              replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)


                                                              partitionsToMakeFollower.foreach { partition =>
                                                              ...
                                                              }
                                                              }
                                                              } catch {
                                                              case e: Throwable =>
                                                              ...
                                                              throw e
                                                              }


                                                              partitionStates.keys.foreach { partition =>
                                                              ...
                                                                  }
                                                              partitionsToMakeFollower
                                                              }
                                                              这个方法的逻辑是:
                                                              1. 给每个分区封装一个没有错误的结果放入结果集
                                                              2. 定义一个集合,用于存放当前节点上本地副本变更为Follower副本的分区
                                                              3. 遍历要成为 Follower 副本的分区,获取控制器中对应分区 Leader 副本的节点id,然后在当前节点的元数据缓存中查找该 Leader 副本的节点信息
                                                              4. 如果元数据缓存中有该节点信息,说明 Leader 副本可用。判断该分区的本地副本之前是否为 Follower 副本,如果不是,说明是切换为 Follower 副本的,将该分区添加到步骤2创建的集合中
                                                              5. 如果元数据缓存中没有该节点信息,说明 Leader 副本不可用。此时创建一个分区对应的本地副本,初始化一些列对应的文件
                                                              6. 如果过程中发送异常,根据异常信息更新结果集中对应分区的结果
                                                              7. 关闭所有切换为 Follower 副本的分区的数据拉取线程
                                                              8. 遍历切换为 Follower 副本的分区,尝试完成延时请求
                                                              9. 如果ReplicaManager未关闭,遍历所有切换为 Follower 副本的分区,找到其 Leader 副本所在的节点,获取拉取数据的起始偏移量,然后启动新的数据拉取线程去 Leader 副本拉取数据
                                                                  其中判断给定分区的本地副本以前是不是 Follower 副本是通过 Partition.makeFollower() 方法实现的。如果从控制器获取的 Leader 副本的节点 id 和Leader Epoch均和本地存储的 一致,说明 Leader 副本没有切换,本地副本之前就是 Follower 副本,返回false;否则返回true。makeFollower() 方法的逻辑如下:
                                                                def makeFollower(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = {
                                                                inWriteLock(leaderIsrUpdateLock) {
                                                                //获取控制器发来的分区所有副本所在节点的brokerId集合
                                                                val newAssignedReplicas = partitionStateInfo.basePartitionState.replicas.asScala.map(_.toInt)
                                                                //获取分区Leader副本的节点id
                                                                val newLeaderBrokerId = partitionStateInfo.basePartitionState.leader
                                                                //获取当前分区的Leader Epoch
                                                                    val oldLeaderEpoch = leaderEpoch
                                                                //更新controller epoch
                                                                    controllerEpoch = partitionStateInfo.basePartitionState.controllerEpoch
                                                                //创建控制器分配的所有副本对象
                                                                    newAssignedReplicas.foreach(r => getOrCreateReplica(r, partitionStateInfo.isNew))
                                                                //移除多余的副本对象(假设之前副本在0,1,2节点上,现在控制器要求副本在1,2,3节点上,那么就移除本地存储的节点0对应的副本对象)
                                                                (assignedReplicas.map(_.brokerId) -- newAssignedReplicas).foreach(removeReplica)
                                                                //清空ISR列表
                                                                inSyncReplicas = Set.empty[Replica]
                                                                //更新Leader Epoch
                                                                leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
                                                                //更新Leader Epoch对应的StartOffset
                                                                leaderEpochStartOffsetOpt = None
                                                                //更新zkVersion
                                                                zkVersion = partitionStateInfo.basePartitionState.zkVersion
                                                                //如果控制器分配的Leader副本就是之前的Leader副本,且Leader Epoch没有改变,说明Leader副本没有切换,那么返回false
                                                                // 否则更新leaderReplicaIdOpt信息,返回true
                                                                if (leaderReplicaIdOpt.contains(newLeaderBrokerId) && leaderEpoch == oldLeaderEpoch) {
                                                                false
                                                                } else {
                                                                leaderReplicaIdOpt = Some(newLeaderBrokerId)
                                                                true
                                                                }
                                                                }
                                                                }

                                                                    这个方法的流程相对简单,已经加了注释,就不逐步分析了。这里需要注意一个点:如果本地副本为 Follower 副本,那么会清空 ISR 列表,即本地副本为 Follower 副本的分区对象中不存储 ISR 列表信息,只有本地副本为 Leader 副本的分区对象才会存储 ISR 列表信息。

                                                                第三部分:

                                                                    if (!hwThreadInitialized) {
                                                                  // 启动高水位检查点线程
                                                                  // 定期将Broker上所有非Offline分区的高水位值写入到检查点文件,默认每5秒执行1次
                                                                  startHighWaterMarksCheckPointThread()
                                                                  hwThreadInitialized = true
                                                                  }


                                                                  val futureReplicasAndInitialOffset = new mutable.HashMap[TopicPartition, InitialFetchState]
                                                                  //遍历前面Leader Epoch校验正常的分区,看是否需要做副本迁移
                                                                  for (partition <- updatedPartitions) {
                                                                  val topicPartition = partition.topicPartition
                                                                  //如果已经定义了指定分区对应的Future日志对象
                                                                  // 这里用于做副本迁移,当用户希望将副本从一个目录迁移到另一个目录时,会将迁移后的目录定义为-future后缀的目录
                                                                  //然后在该目录下创建future日志,当future日志追上current日志时,来替换当前分区的日志
                                                                  if (logManager.getLog(topicPartition, isFuture = true).isDefined) {
                                                                  partition.localReplica.foreach { replica =>
                                                                  val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
                                                                    
                                                                  //在分区的副本集合中添加Future副本
                                                                  partition.getOrCreateReplica(Request.FutureLocalReplicaId, isNew = false)
                                                                    
                                                                  //暂停分区日志的清理
                                                                  logManager.abortAndPauseCleaning(topicPartition)


                                                                  futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(leader,
                                                                  partition.getLeaderEpoch, replica.highWatermark.messageOffset))
                                                                  }
                                                                  }
                                                                  }
                                                                  //对指定的分区进行日志数据的迁移
                                                                  replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset)
                                                                  // 关闭空闲副本拉取线程
                                                                  replicaFetcherManager.shutdownIdleFetcherThreads()
                                                                  // 关闭空闲日志路径数据迁移线程
                                                                  replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
                                                                  // 执行Leader变更之后的回调逻辑
                                                                  onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
                                                                  //封装LeaderAndIsrResponse响应
                                                                  new LeaderAndIsrResponse(Errors.NONE, responseMap.asJava)
                                                                  }
                                                                  在执行完 Leader 副本和 Follower 副本的切换之后,后续操作和封装响应的逻辑如下:
                                                                  1. 启动高水位检查点线程,定期将所有非离线分区的高水位值写入检查点文件
                                                                  2. 遍历Leader Epoch校验正常的分区,看是否需要进行副本迁移(就是本地副本对应日志文件的迁移),判断依据是该分区是否定义了 Future 日志对象。如果需要进行副本迁移,则在指定分区的副本集合中增加Future 副本对象,然后暂停分区的日志清理。
                                                                  3. 对指定的分区进行日志数据的迁移
                                                                  4. 关闭空闲副本拉取线程
                                                                  5. 关闭空闲日志路径数据迁移线程
                                                                  6. 执行 Leader 副本变更之后的回调逻辑
                                                                  7. 封装 LeaderAndIsrResponse 响应并返回
                                                                  总结:
                                                                  • ReplicaManager 管理了节点上的所有分区对象,而分区对象中管理了下面的一组副本对象。ReplicaManager 通过对分区对象的直接管理实现对副本对象的间接管理
                                                                  • 副本读写数据的操作由 ReplicaManager 管理
                                                                  • 副本状态的切换是由控制器发送 LeaderAndISRRequest 请求来管理的
                                                                  • 执行副本状态的切换是在 ReplicaManager 的 becomeLeaderOrFollower 方法中进行的
                                                                  • 切换为 Leader 副本的逻辑通过 ReplicaManager 的 makeLeaders 方法实现
                                                                  • 切换为 Follower 副本的逻辑通过 ReplicaManager 的 makeFollowers 方法实现
                                                                  • 只有本地副本为 Leader 副本的分区对象会保存 ISR 列表信息,本地副本为 Follower 副本的分区对象是不存储 ISR 列表信息的

                                                                  文章转载自大数据记事本,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                                  评论