暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

深入理解Kafka服务端之Coordinator简介及GroupMetadataManager组件介绍

大数据记事本 2020-12-29
842

一、场景分析

    在旧版的Kafka消费者客户端中,是利用 Zookeeper 的监听器来实现对应功能的,通过对 ZK 的/consumers/<group>/ids 路径子节点数量变化进行监听,识别消费者组下消费者的变化;通过对/brokers/ids 路径子节点数量变化进行监听,识别 Broker 节点的增减。通过这两个监听器,每个消费者就可以监听消费者组和集群 Broker 的增减,以此来触发 Rebalance 操作。
    在这种方式下,当触发 Rebalance 操作时,一个消费者组下的所有消费者同时进行 Rebalance 操作,而消费者之间是相互隔离的,并不知道其它消费者操作的结果,这样就可能导致 Kafka 工作在一个不正确的状态。而且严重依赖 Zookeeper 还有两个严重的问题:
  • 羊群效应:Zookeeper 中被监听的节点发生变化,大量的 Watcher 通知被发送到客户端,导致在通知期间的其他操作被延迟,也可能发生类似死锁的情况。

  • 脑裂问题:消费者组进行 Rebalance 操作时每个消费者都与 Zookeeper 进行通信以判断消费者或 Broker 变化的情况,由于 Zookeeper 本身的特性,可能导致同一时刻各个消费者获取的状态不一致,这样会导致异常。

Coordinator 机制
    在新版的消费者客户端中,引入了 ConsumerCoordinator 消费者协调器,对应的在服务端加入了 GroupCoordinator 组协调器。ConsumerCoordinator 负责与 GroupCoordinator 进行交互。
    在 KafkaConsumer 内部有一个 ConsumerCoordinator 类型的属性,在初始化 KafkaConsumer 时,会初始化对应的 ConsumerCoordinator。
    GroupCoordinator 是 Broker 端的一个服务,在 Broker 启动的时候会进行初始化并启动。GroupCoordinator 管理了所有消费者组的一个子集。这个子集的确定方式是:消费者组注册消息所在__consumer_offset 位移主题的分区的 Leader 副本如果在当前节点,那么当前节点上的 GroupCoordinator 就管理这个消费者组
    Coordinator 机制最重要的操作就是进行 Rebalance 的协调,这个操作相对复杂,这篇先不具体分析。这篇先介绍 GroupMetadataManager 组件的功能,GroupCoordinator 通过它来管理自己管辖内的所有消费者组元数据,该组件还提供了一些列方法供 GroupCoordinator 进行调用,来完成 Rebalance 操作。

二、图示说明

读取位移主题消息,填充消费者组元数据流程图解

三、源码分析

1. GroupMetadataManager 定义:
    class GroupMetadataManager(brokerId: Int,//当前节点id
    interBrokerProtocolVersion: ApiVersion,//Broker端参数inter.broker.protocol.version值
    config: OffsetConfig,//内部位移主题的配置类
    replicaManager: ReplicaManager,//副本管理器对象
    zkClient: KafkaZkClient,//zk客户端对象
    time: Time//时间工具类
    ) extends Logging with KafkaMetricsGroup {
    //位移主题消息的压缩格式
    private val compressionType: CompressionType = CompressionType.forId(config.offsetsTopicCompressionCodec.codec)
    //消费组元数据缓存,key是消费组group.id,value是组元数据对象
    private val groupMetadataCache = new Pool[String, GroupMetadata]


    private val partitionLock = new ReentrantLock()


    // 位移主题下正在执行加载操作的分区
    private val loadingPartitions: mutable.Set[Int] = mutable.Set()


    // 位移主题下完成加载操作的分区
    private val ownedPartitions: mutable.Set[Int] = mutable.Set()


    private val shuttingDown = new AtomicBoolean(false)


    // 位移主题总分区数,默认为50
    private val groupMetadataTopicPartitionCount = getGroupMetadataTopicPartitionCount


    //初始化一个调度器
    private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "group-metadata-manager-")
    ...
    }

    重点介绍下面几个变量:

    • groupMetadataCache:管理的所有消费者组的元数据,key表示消费者组id,即消费者指定的 group.id 参数;value 是 GroupMetadata 类型,表示该消费者组的元数据信息对象。这个变量是 GroupMetadataManager 中最重要的一个变量,加入消费者组或者移除消费者组都会操作这个变量

    • loadingPartitions:位移主题__consumer_offsets 下正在执行加载操作的分区号的集合。所谓的加载,是指读取位移主题消息数据,填充 groupMetadataCache 变量的操作。

    • ownedPartitions:位移主题__consumer_offsets 下完成加载操作的分区号的集合。

    • groupMetadataTopicPartitionCount:位移主题__consumer_offsets 的分区数量,默认为 50.

    • scheduler:调度器对象,用于调度定时任务,如 主题分区中过期消息的定期清理任务

    2. 重要方法简介

    GroupMetadataManager 定义的重要方法可以分为以下三类

    2.1 对消费者组元数据的管理。包括

        ①查询消费者组元数据

        ②添加消费者组元数据

        ③移除消费者组元数据

        ④将消费者组元数据对应的注册消息写入 __consumer_offsets 主题

    2.2 对消费者组消费位移的管理

        ①将位移提交消息写入 __consumer_offsets 主题

        ②查询消费者组已提交位移。(注意:这里是从 GroupMetadata 组元数据信息的缓存中进行查询)

    2.3 读取 __consumer_offsets 主题的消息,填充消费者组元数据缓存。

        为什么把这个操作单独拿出来呢?因为消费者程序在获取消费位移时,并不是去直接读取位移主题的消息,而是从对应消费者组的元数据缓存 GroupMetadata 中进行读取。而元数据缓存中的位移消息,是在消费者组被当前节点的 GroupCoordinator 管理时进行填充的。换句话说,就是消费者组注册消息所在的位移主题分区的 Leader 副本切换到当前节点上时,会将分区中所有消费者组的位移提交信息加载到元数据缓存。

    3. 重要方法详解

    3.1对管辖内所有消费者组元数据的管理:主要就是操作 groupMetadataCache 变量

    3.1.1 查询消费者组元数据

    getGroup:根据给定的 group.id ,获取对应的消费者组元数据对象。如果没有则返回None

      def getGroup(groupId: String): Option[GroupMetadata] = {
      Option(groupMetadataCache.get(groupId))
      }
      3.1.2 添加消费者组元数据

      addGroup:将给定的消费者组元数据添加到 groupMetadataCache 变量中

        def addGroup(group: GroupMetadata): GroupMetadata = {
        val currentGroup = groupMetadataCache.putIfNotExists(group.groupId, group)
        if (currentGroup != null) {
        currentGroup
        } else {
        group
        }
        }
        3.1.3 移除消费者组元数据

        removeGroupsForPartition:移除给定位移主题分区中存储的所有消费者组元数据。当 Broker 卸任某些消费者组的 Coordinator 角色时,它需要将这些消费者组的信息从 groupMetadataCache 中全部移除掉

          def removeGroupsForPartition(offsetsPartition: Int,//消费者组元数据消息所在的分区号
          onGroupUnloaded: GroupMetadata => Unit) {
          //获取位移主题分区对象
          val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
          info(s"Scheduling unloading of offsets and group metadata from $topicPartition")
          //创建异步任务,移除组信息和位移信息
          scheduler.schedule(topicPartition.toString, () => removeGroupsAndOffsets)
          //定义了一个内部方法,用于移除组信息和位移信息
          def removeGroupsAndOffsets() {
          var numOffsetsRemoved = 0
          var numGroupsRemoved = 0


          inLock(partitionLock) {
          // 移除已加载分区集合ownedPartitions中给定位移主题分区的记录
          ownedPartitions.remove(offsetsPartition)
          //遍历所有消费者组信息
          for (group <- groupMetadataCache.values) {
          //如果该消费者组元数据消息保存在给定的位移主题分区中
          if (partitionFor(group.groupId) == offsetsPartition) {
          //进行消费者组的卸载逻辑,这个逻辑是在上层调用时传入的,具体逻辑在GroupCoordinator.onGroupUnloaded方法中
          onGroupUnloaded(group)
          //将该组信息从元数据缓存中移除
          groupMetadataCache.remove(group.groupId, group)
          //把消费者组从producer对应的组集合中移除
          removeGroupFromAllProducers(group.groupId)
          //更新移除组数量
          numGroupsRemoved += 1
          //更新移除位移值数量
          numOffsetsRemoved += group.numOffsets
          }
          }
          }


          info(s"Finished unloading $topicPartition. Removed $numOffsetsRemoved cached offsets " +
          s"and $numGroupsRemoved cached groups.")
          }
          }

          该方法的逻辑是:

          • 第一步:根据给定的待移除分区号,封装位移主题分区对象

          • 第二步:创建一个异步的调度任务,将组信息和位移信息进行移除

          调度任务的流程是:

          • 首先,将给定的待移除分区号从已加载分区号集合中移除

          • 接着,遍历所有的消费者组元数据,选出元数据在待移除分区上的消费者组,执行下面的逻辑:

              ① 执行消费者组的卸载逻辑,该逻辑是在上层调用时传入的,具体逻辑在GroupCoordinator.onGroupUnloaded方法中,主要做两件事:①将消费者组状态变更到 Dead 状态;②封装异常表示 Coordinator 已发生变更,然后调用回调函数返回。

              ② 将该消费者组元数据从 groupMetadataCache 变量中移除

              ③ 把消费者组从producer对应的组集合中移除,这里的producer是事务中使用的

              ④ 更新移除的消费者组数量和移除的主题分区提交位移数量(即 GroupMetadata 的 offsets 变量对应集合的大小)

          3.1.4 将注册消息写入位移主题

          storeGroup:封装消费者组注册消息,写入位移主题
            def storeGroup(group: GroupMetadata,
            groupAssignment: Map[String, Array[Byte]],
            responseCallback: Errors => Unit): Unit = {
            //判断当前Broker是否是该消费者组的Coordinator
            getMagic(partitionFor(group.groupId)) match {
            //如果是该消费者组的Coordinator
                  case Some(magicValue) =>
            //定义写入消息的时间戳类型
            val timestampType = TimestampType.CREATE_TIME
            val timestamp = time.milliseconds()
            //构建写入消息的key,是一个字节数组
            val key = GroupMetadataManager.groupMetadataKey(group.groupId)
            //构建写入消息的value,也是一个字节数组
            val value = GroupMetadataManager.groupMetadataValue(group, groupAssignment, interBrokerProtocolVersion)
            //封装待写入的消费者组注册消息集合
            val records = {
            val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compressionType,
            Seq(new SimpleRecord(timestamp, key, value)).asJava))
            val builder = MemoryRecords.builder(buffer, magicValue, compressionType, timestampType, 0L)
            builder.append(timestamp, key, value)
            builder.build()
            }
            //根据消费者组名称获取注册消息所在主题分区对象
            val groupMetadataPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
            val groupMetadataRecords = Map(groupMetadataPartition -> records)
            val generationId = group.generationId
            //回调函数,用于将消息写入位移主题后更新缓存中的消费者组元数据
                    def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
            if (responseStatus.size != 1 || !responseStatus.contains(groupMetadataPartition))
            throw new IllegalStateException("Append status %s should only have one partition %s"
                          .format(responseStatus, groupMetadataPartition))
            //获取主题分区的响应状态
            val status = responseStatus(groupMetadataPartition)
            //如果没有错误
            val responseError = if (status.error == Errors.NONE) {
            Errors.NONE
            //如果有错误,根据错误信息获取对象的错误响应状态
            } else {
            debug(s"Metadata from group ${group.groupId} with generation $generationId failed when appending to log " +
                          s"due to ${status.error.exceptionName}")
                      
            status.error match {
            case Errors.UNKNOWN_TOPIC_OR_PARTITION
            | Errors.NOT_ENOUGH_REPLICAS
            | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
            //Coordinator不可用
            Errors.COORDINATOR_NOT_AVAILABLE


            case Errors.NOT_LEADER_FOR_PARTITION
            | Errors.KAFKA_STORAGE_ERROR =>
            //未找到Coordinator
            Errors.NOT_COORDINATOR


            case Errors.REQUEST_TIMED_OUT =>
            //执行Rebalance
            Errors.REBALANCE_IN_PROGRESS


            case Errors.MESSAGE_TOO_LARGE
            | Errors.RECORD_LIST_TOO_LARGE
            | Errors.INVALID_FETCH_SIZE =>


            error(s"Appending metadata message for group ${group.groupId} generation $generationId failed due to " +
            s"${status.error.exceptionName}, returning UNKNOWN error code to the client")
            //未知的服务端错误
            Errors.UNKNOWN_SERVER_ERROR


            case other =>
            error(s"Appending metadata message for group ${group.groupId} generation $generationId failed " +
            s"due to unexpected error: ${status.error.exceptionName}")


            other
            }
            }
            //执行上层调用传入的回调函数
            responseCallback(responseError)
            }
            //向位移主题写入消费者组注册消息
            appendForGroup(group, groupMetadataRecords, putCacheCallback)
            //如果当前Broker不是该消费者组的Coordinator,则返回一个 NOT_COORDINATOR 错误
            case None =>
            responseCallback(Errors.NOT_COORDINATOR)
            None
            }
            }

            该方法的逻辑是:

            • 第一步:判断当前节点是否是管理该消费者组的 GroupCoordinator。判断依据是:尝试去获取位移主题目标分区的底层日志对象。如果能够获取到,就说明当前 Broker 是 Coordinator,程序进入到下一步;反之,则表明当前 Broker 不是 Coordinator,就构造一个 NOT_COORDINATOR 异常返回。

            • 第二步:设置时间戳类型,封装注册消息的 key 和 value,构建待写入的消费者组注册消息集合,即 MemoryRecords 对象

            • 第三步:根据消费者组名称,计算注册消息要写入的分区号,并构建 TopicPartition 主题分区对象

            • 第四步:调用 appendForGroup 方法,向位移主题写入第二步构建的消费者组注册消息集合

                在这个方法中定义了一个回调函数 putCacheCallback ,其逻辑时:根据写入主题分区的响应状态,执行上层调用传入的 responseCallback 回调函数。上层调用该方法的地方有两个,分别是 Rebalance 过程中执行 消费者加入组操作和 同步组操作。具体的回调函数逻辑在分析 Rebalance 操作时再具体分析。

                对于 appendForGroup 方法,其作用就是将构建的消息集合写入位移主题,和写入普通主题一样,调用的就是 ReplicaManager.appendRecords 方法

              private def appendForGroup(group: GroupMetadata,
              records: Map[TopicPartition, MemoryRecords],
              callback: Map[TopicPartition, PartitionResponse] => Unit): Unit = {
              //调用副本管理器的appendRecords方法向位移主题写入消息
              replicaManager.appendRecords(
              timeout = config.offsetCommitTimeoutMs.toLong,
              requiredAcks = config.offsetCommitRequiredAcks,
              internalTopicsAllowed = true,
              isFromClient = false,
              entriesPerPartition = records,
              delayedProduceLock = Some(group.lock),
              responseCallback = callback)
              }

              3.2 对管辖内所有消费者组消费位移的管理

              3.2.1 写入消费者组位移提交消息
              storeOffsets:
                def storeOffsets(group: GroupMetadata,//消费者组元数据
                consumerId: String,//消费者组中消费者成员ID
                offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],//待保存的位移值,按照分区分组
                responseCallback: immutable.Map[TopicPartition, Errors] => Unit,//处理完成后的回调函数
                producerId: Long = RecordBatch.NO_PRODUCER_ID,//事务型Producer ID
                producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH//事务型Producer Epoch值
                ): Unit = {
                //过滤出满足特定条件的待保存位移数据
                //特定条件:metadata为null或者长度小于4kb,metadata为自定义元数据,如果不手动设置的话为null
                val filteredOffsetMetadata = offsetMetadata.filter { case (_, offsetAndMetadata) =>
                validateOffsetMetadataLength(offsetAndMetadata.metadata)
                }


                ...


                val isTxnOffsetCommit = producerId != RecordBatch.NO_PRODUCER_ID
                //如果没有任何分区的待保存位移满足特定条件
                if (filteredOffsetMetadata.isEmpty) {
                // 构造OFFSET_METADATA_TOO_LARGE异常并调用responseCallback返回
                val commitStatus = offsetMetadata.mapValues(_ => Errors.OFFSET_METADATA_TOO_LARGE)
                responseCallback(commitStatus)
                None
                } else {
                //查看当前Broker是否为给定消费者组的Coordinator
                getMagic(partitionFor(group.groupId)) match {
                //如果是Coordinator
                case Some(magicValue) =>
                //设置消息的时间戳类型
                val timestampType = TimestampType.CREATE_TIME
                val timestamp = time.milliseconds()
                //构造位移主题的位移提交消息
                val records = filteredOffsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
                val key = GroupMetadataManager.offsetCommitKey(group.groupId, topicPartition)
                val value = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, interBrokerProtocolVersion)
                new SimpleRecord(timestamp, key, value)
                }
                //获取指定消费者组位移提交消息所在的主题分区对象
                val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
                //为写入消息申请内存Buffer
                val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compressionType, records.asJava))


                if (isTxnOffsetCommit && magicValue < RecordBatch.MAGIC_VALUE_V2)
                throw Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT.exception("Attempting to make a transaction offset commit with an invalid magic: " + magicValue)
                //封装待写入消息集合MemoryRecords
                val builder = MemoryRecords.builder(buffer, magicValue, compressionType, timestampType, 0L, time.milliseconds(),
                producerId, producerEpoch, 0, isTxnOffsetCommit, RecordBatch.NO_PARTITION_LEADER_EPOCH)


                records.foreach(builder.append)
                val entries = Map(offsetTopicPartition -> builder.build())


                //在位移提交消息写入日志后,调用该回调函数来更新消费者组元数据,
                // 即将多个消费者组位移值填充到 GroupMetadata 的 offsets 元数据缓存中。
                def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
                //确保消息写入到指定位移主题分区,否则抛出异常
                if (responseStatus.size != 1 || !responseStatus.contains(offsetTopicPartition))
                throw new IllegalStateException("Append status %s should only have one partition %s"
                .format(responseStatus, offsetTopicPartition))


                val status = responseStatus(offsetTopicPartition)


                val responseError = group.inLock {
                //如果写入结果没有错误
                if (status.error == Errors.NONE) {
                //如果消费者组不是Dead状态
                if (!group.is(Dead)) {
                filteredOffsetMetadata.foreach { case (topicPartition, offsetAndMetadata) =>
                if (isTxnOffsetCommit)
                group.onTxnOffsetCommitAppend(producerId, topicPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
                else
                //调用GroupMetadata的onOffsetCommitAppend方法填充元数据
                // 也就是更新消费者组订阅的主题分区具体消费到哪个地方了,以及更新位移提交消息在_consumer_offsets主题中对应分区的偏移量
                group.onOffsetCommitAppend(topicPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
                }
                }
                Errors.NONE
                //如果写入结果存在异常
                } else {
                //且消费者组的状态不是Dead
                if (!group.is(Dead)) {
                if (!group.hasPendingOffsetCommitsFromProducer(producerId))
                removeProducerGroup(producerId, group.groupId)
                filteredOffsetMetadata.foreach { case (topicPartition, offsetAndMetadata) =>
                if (isTxnOffsetCommit)
                group.failPendingTxnOffsetCommit(producerId, topicPartition)
                else
                group.failPendingOffsetWrite(topicPartition, offsetAndMetadata)
                }
                }
                //确认写入结果中的异常类型
                status.error match {
                case Errors.UNKNOWN_TOPIC_OR_PARTITION
                | Errors.NOT_ENOUGH_REPLICAS
                | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
                Errors.COORDINATOR_NOT_AVAILABLE


                case Errors.NOT_LEADER_FOR_PARTITION
                | Errors.KAFKA_STORAGE_ERROR =>
                Errors.NOT_COORDINATOR


                case Errors.MESSAGE_TOO_LARGE
                | Errors.RECORD_LIST_TOO_LARGE
                | Errors.INVALID_FETCH_SIZE =>
                Errors.INVALID_COMMIT_OFFSET_SIZE


                case other => other
                }
                }
                }
                //利用异常类型构建提交返回状态
                val commitStatus = offsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
                if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
                (topicPartition, responseError)
                else
                (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE)
                }
                //调用回调函数
                responseCallback(commitStatus)
                }


                if (isTxnOffsetCommit) {
                group.inLock {
                addProducerGroup(producerId, group.groupId)
                group.prepareTxnOffsetCommit(producerId, offsetMetadata)
                }
                } else {
                group.inLock {
                group.prepareOffsetCommit(offsetMetadata)
                }
                }
                //写入消息到位移主题,同时调用putCacheCallback方法更新消费者元数据
                appendForGroup(group, entries, putCacheCallback)
                //如果不是Coordinator
                case None =>
                //构造NOT_COORDINATOR异常并提交给responseCallback方法
                val commitStatus = offsetMetadata.map { case (topicPartition, _) =>
                (topicPartition, Errors.NOT_COORDINATOR)
                }
                responseCallback(commitStatus)
                None
                }
                }
                }
                该方法的逻辑是:
                • 第一步:筛选出满足条件的待写入主题分区和已提交位移信息。这里的条件是:已提交位移对象中的 metadata 为 null ,或者长度小于服务端参数offset.metadata.max.bytes 的值,即 4 KB。这个 metadata 是自定义的元数据,字符串类型,如果没有指定则为 null。

                • 第二步:如果没有满足条件的待写入主题分区消费位移信息,则构造OFFSET_METADATA_TOO_LARGE异常,调用 responseCallback 并返回 None

                • 第三步:如果存在满足条件的待写入主题分区消费位移信息,则判断当前节点是否为给定消费者组的 GroupCoordinator ,如果不是则构造 NOT_COORDINATOR异常,调用 responseCallback 并返回 None

                • 第四步:如果当前节点是给定消费者组的 GroupCoordinator,则根据给定的主题分区和对应的消费位移信息构建已提交位移消息,并封装成待写入消息集合 MemoryRecords

                • 第五步:定义 putCacheCallback 回调函数并调用 appendForGroup 方法将上面封装好消息集合写入位移主题,并更新消费者组元数据

                    其中,回调函数 putCacheCallback 的作用就是更新消费者组元数据,即将多个消费者组位移值填充到 GroupMetadata 的 offsets 元数据缓存中。试想:当写入了新的已提交位移消息,说明消费者组的消费进度变更了,那么肯定要更新内存中消费者组元数据维护的已提交位移信息。这样消费者程序在获取消费位移时,才可以获取到最新的数据。
                该回调函数的逻辑如下:
                • 第一步:要确保位移消息写入到指定位移主题分区,否则就抛出异常。

                • 第二步:判断写入结果是否有错误。(这里先不考虑事务相关的处理)

                  • 如果没有错误,只要组状态不是 Dead 状态,就遍历给定的主题分区,调用 GroupMetadata 的 onOffsetCommitAppend 方法填充元数据。onOffsetCommitAppend 方法的主体逻辑,是将消费者组订阅分区的位移值写入到 offsets 字段保存的集合中。如果状态是 Dead,则什么都不做。

                  • 如果有错误,只要组状态不是 Dead 状态,就遍历给定的主题分区,通过 failPendingOffsetWrite 方法取消未完成的位移消息写入。也就是将主题分区从等待进行位移提交的分区集合中移除

                • 第三步:将日志写入的异常类型转换成表征提交状态错误的异常类型。然后再将转换后的异常封装进 commitStatus 字段中传给回调函数。

                • 第四步:调用回调函数返回

                3.2.2 查询消费者组位移信息
                getOffsets:该方法的作用是根据给定的消费者组名称,和订阅的主题分区列表,获取主题分区对应的消费位移,返回值是一个Map,key是主题分区对象,value是 OffsetFetchResponse.PartitionData 类型对象,里面封装了位移值,LeaderEpoch,metadata自定义元数据等
                  def getOffsets(groupId: String, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, OffsetFetchResponse.PartitionData] = {
                  trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
                  //第一步:获取给定消费者组的元数据信息
                  val group = groupMetadataCache.get(groupId)
                  //如果没有获取到,则返回空数据
                  if (group == null) {
                  topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
                  val partitionData = new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                  Optional.empty(), "", Errors.NONE)
                  topicPartition -> partitionData
                  }.toMap
                  //第二步:如果获取到了
                  } else {
                  group.inLock {
                  //如果消费者组状态为Dead,同样返回空数据
                  if (group.is(Dead)) {
                  topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
                  val partitionData = new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                  Optional.empty(), "", Errors.NONE)
                  topicPartition -> partitionData
                  }.toMap
                  //如果组状态不为Dead
                  } else {
                  //第三步:看是否指定了要读取的主题分区
                  topicPartitionsOpt match {
                  //如果没有指定
                  case None =>
                  // Return offsets for all partitions owned by this consumer group. (this only applies to consumers
                  // that commit offsets to Kafka.)
                  //获取指定消费者组对所有订阅主题分区的消费位移
                  group.allOffsets.map { case (topicPartition, offsetAndMetadata) =>
                  //按照主题分区封装PartitionData
                  topicPartition -> new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset,
                  offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE)
                  }
                  //第四步:如果指定了要读取的主题分区
                  case Some(topicPartitions) =>
                  //遍历指定的所有主题分区
                  topicPartitions.map { topicPartition =>
                  //获取对应的OffsetAndMetadata对象
                  val partitionData = group.offset(topicPartition) match {
                  //如果没有获取到,则返回空数据
                  case None =>
                  new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                  Optional.empty(), "", Errors.NONE)
                  //如果获取到了,根据获取的OffsetAndMetadata信息封装PartitionData对象
                  case Some(offsetAndMetadata) =>
                  new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset,
                  offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE)
                  }
                  topicPartition -> partitionData
                  }.toMap
                  }
                  }
                  }
                  }
                  }
                  该方法的逻辑是:
                  • 第一步:获取给定消费者组的元数据信息,如果没有获取到,返回空数据集

                  • 第二步:如果获取到了,判断消费者组的状态,如果是 Dead,说明消费者组已经被销毁了,位移数据也被视为不可用了,依然返回空数据集

                  • 第三步:如果消费者组状态不是 Dead,看方法调用时是否指定了主题分区。如果没有指定,那么就获取该消费者组订阅的所有主题分区的消费位移

                  • 第四步:如果指定了主题分区,则遍历这些分区,从消费者组元数据中获取该分区的消费位移信息。如果没有获取到,还是返回空数据集;如果获取到了,则将消费位移信息封装到 OffsetFetchResponse.PartitionData 对象中

                  • 第五步:返回结果集

                  3.3 读取位移主题消息,填充元数据缓存

                      当位移主题的某个分区在当前节点的副本成为 Leader 副本时,也就是当前节点的 GroupCoordinator 管理了该位移主题分区内所有注册的消费者组,这时就会调用 scheduleLoadGroupAndOffsets 方法。该方法会创建一个异步的任务,从位移主题中读取主题分区的消费位移。这里的主题分区指的是:给定的位移主题分区内注册的所有消费者组订阅的所有主题分区
                    def scheduleLoadGroupAndOffsets(offsetsPartition: Int, onGroupLoaded: GroupMetadata => Unit) {
                    //根据给定的分区号封装主题分区对象
                    val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
                    //如果将该分区号成功添加到 loadingPartitions 集合
                    if (addLoadingPartition(offsetsPartition)) {
                    info(s"Scheduling loading of offsets and group metadata from $topicPartition")
                    //创建调度任务,从位移主题中读取主题分区的消费位移
                    scheduler.schedule(topicPartition.toString, () => loadGroupsAndOffsets(topicPartition, onGroupLoaded))
                    } else {
                    info(s"Already loading offsets and group metadata from $topicPartition")
                    }
                    }
                    异步任务调用的是 loadGroupsAndOffsets 方法:
                      private[group] def loadGroupsAndOffsets(topicPartition: TopicPartition, onGroupLoaded: GroupMetadata => Unit) {
                      try {
                      val startMs = time.milliseconds()
                      //最重要的逻辑都在这个方法里面
                      doLoadGroupsAndOffsets(topicPartition, onGroupLoaded)
                      info(s"Finished loading offsets and group metadata from $topicPartition in ${time.milliseconds() - startMs} milliseconds.")
                      } catch {
                      case t: Throwable => error(s"Error loading offsets from $topicPartition", t)
                      } finally {
                      inLock(partitionLock) {
                      //将分区添加到完成加载的分区集合
                      ownedPartitions.add(topicPartition.partition)
                      //并将分区从正在执行加载的分区集合中移除
                      loadingPartitions.remove(topicPartition.partition)
                      }
                      }
                      }
                          可以看到,真正读取位移主题中消费信息的,是在 doLoadGroupsAndOffsets 方法中进行的,这个方法的主要作用有两个:①加载消费者组元数据;②加载消费者组的消费位移
                        private def doLoadGroupsAndOffsets(topicPartition: TopicPartition, onGroupLoaded: GroupMetadata => Unit) {
                        //第一步:获取位移主题指定分区的LEO值,如果当前节点不是该分区的Leader副本,则返回-1
                        def logEndOffset: Long = replicaManager.getLogEndOffset(topicPartition).getOrElse(-1L)
                            ...
                            首先,在方法中定义了一个内部方法,用于获取位移主题给定分区的 LEO,如果当前节点不是该分区 Leader 副本所在的节点,则返回 -1.
                            接着,该方法会尝试获取指定分区的在当前节点的日志对象,如果没有获取到,只打印一条日志,如果获取到了,则执行后面的核心逻辑
                          //获取该分区本地副本日志
                          replicaManager.getLog(topicPartition) match {
                          //如果没有获取到,则只打印日志
                          case None =>
                          warn(s"Attempted to load offsets and group metadata from $topicPartition, but found no log")
                          //如果获取到了日志对象
                          case Some(log) =>
                          核心逻辑
                          ...
                          核心逻辑可以大致分为三个部分:
                          • 定义四个集合,用于存储读取位移主题时获取到的重要信息,然后读取位移主题的消息

                          • 处理读取到的消息,填充四个集合

                          • 处理四个集合中的数据

                          第一部分:读取位移主题
                            //获取分区日志的起始偏移量
                            var currOffset = log.logStartOffset
                            //申请一个ByteBuffer对象,容量为0
                            var buffer = ByteBuffer.allocate(0)
                            //定义已完成位移值加载的分区列表
                            val loadedOffsets = mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]()
                            //定义处于位移加载中的分区列表,只用于Kafka事务
                            val pendingOffsets = mutable.Map[Long, mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]]()
                            //定义已完成组信息加载的消费者组列表
                            val loadedGroups = mutable.Map[String, GroupMetadata]()
                            //定义待移除的消费者组列表
                            val removedGroups = mutable.Set[String]()
                            //循环读取分区的所有消息
                            while (currOffset < logEndOffset && !shuttingDown.get()) {
                            //读取消息
                            val fetchDataInfo = log.read(currOffset, config.loadBufferSize, maxOffset = None,
                            minOneMessage = true, includeAbortedTxns = false)
                            //创建消息集合
                            val memRecords = fetchDataInfo.records match {
                            //如果读出的是MemoryRecords类型,直接返回
                            case records: MemoryRecords => records
                            //如果是FileRecords类型,需要转成MemoryRecords类型,然后返回
                            case fileRecords: FileRecords =>
                            val sizeInBytes = fileRecords.sizeInBytes
                            val bytesNeeded = Math.max(config.loadBufferSize, sizeInBytes)


                            if (buffer.capacity < bytesNeeded) {
                            if (config.loadBufferSize < bytesNeeded)
                            warn(s"Loaded offsets and group metadata from $topicPartition with buffer larger ($bytesNeeded bytes) than " +
                                        s"configured offsets.load.buffer.size (${config.loadBufferSize} bytes)")
                            buffer = ByteBuffer.allocate(bytesNeeded)
                            } else {
                            buffer.clear()
                                  }
                            fileRecords.readInto(buffer, 0)
                            MemoryRecords.readableRecords(buffer)
                            }
                            • 第一步:获取分区日志的起始偏移量,并申请一个容量为 0 的 ByteBuffer对象

                            • 第二步:定义四个集合:

                              • loadedOffsets:已完成位移值加载的分区列表

                              • pendingOffsets:位移值正在加载中的分区列表,只用于 Kafka 事务

                              • loadedGroups:已完成组信息加载的消费者组列表

                              • removedGroups:待移除的消费者组列表。

                            • 第三步:当读取偏移量小于 LEO,且组件未关闭,说明位移主题下该分区的消息没有读取完,不断地进行读取

                            • 第四步:根据读取到的消息创建内存中的消息集合对象,即 MemoryRecords 对象,如果读取到的是 FileRecords 类型,需要转成 MemoryRecords 类型。

                            第二部分:根据读取的数据,填充四个集合

                              //遍历消息集中的批次对象
                              memRecords.batches.asScala.foreach { batch =>
                              val isTxnOffsetCommit = batch.isTransactional
                                //控制类消息批次,属于事务的范畴
                              if (batch.isControlBatch) {
                                 ...
                              } else {
                              //定义一个变量,用于保存消息批次第一条消息的偏移量
                              var batchBaseOffset: Option[Long] = None
                              //遍历批次中的每条消息
                              for (record <- batch.asScala) {
                              require(record.hasKey, "Group metadata/offset entry key should not be null")
                              //填充起始消息偏移量
                              if (batchBaseOffset.isEmpty)
                              batchBaseOffset = Some(record.offset)
                              //根据消息的key,判断是哪种类型
                              GroupMetadataManager.readMessageKey(record.key) match {
                              //如果是已提交位移消息
                              case offsetKey: OffsetKey =>
                              if (isTxnOffsetCommit && !pendingOffsets.contains(batch.producerId))
                              pendingOffsets.put(batch.producerId, mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]())
                              //从key中获取(消费者组名,主题,分区号)三元组
                              val groupTopicPartition = offsetKey.key
                              //如果消息没有value,也就是墓碑消息
                              if (!record.hasValue) {
                              if (isTxnOffsetCommit)
                              pendingOffsets(batch.producerId).remove(groupTopicPartition)
                              else
                              //由于是墓碑消息,所以这个主题分区对应的消息已经过期
                              //之前如果加载了,应该从已完成位移值加载的分区列表中移除
                              loadedOffsets.remove(groupTopicPartition)
                              //如果有value
                              } else {
                              //获取value中的信息
                              val offsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(record.value)
                              if (isTxnOffsetCommit)
                              pendingOffsets(batch.producerId).put(groupTopicPartition, CommitRecordMetadataAndOffset(batchBaseOffset, offsetAndMetadata))
                              else
                              //将从消息中读取的主题分区消费信息放入已完成位移值加载的分区列表
                              loadedOffsets.put(groupTopicPartition, CommitRecordMetadataAndOffset(batchBaseOffset, offsetAndMetadata))
                              }
                              //如果是消费者组注册类消息
                              case groupMetadataKey: GroupMetadataKey =>
                              //获取消费者组名
                              val groupId = groupMetadataKey.key
                              //根据消息的value封装消费者组元数据对象
                              val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value, time)
                              //如果value有值
                              //TODO 由于是按照偏移量读取的,墓碑消息的偏移量会比对应过期消息的大,所有墓碑消息一定是后读取的
                              if (groupMetadata != null) {
                              //把该消费者组从待移除消费者组列表中移除
                              removedGroups.remove(groupId)
                              //把该消费者组添加到已完成组信息加载的列表
                              loadedGroups.put(groupId, groupMetadata)
                              //如果value为空,说明是墓碑消息
                              } else {
                              //把该消费者组从已完成组信息加载列表中移除
                              loadedGroups.remove(groupId)
                              //把该消费者组添加到待移除消费者组列表中
                              removedGroups.add(groupId)
                              }
                              //如果是未知类型的Key,抛出异常
                              case unknownKey =>
                              throw new IllegalStateException(s"Unexpected message key $unknownKey while loading offsets and group metadata")
                              }
                              }
                              }
                              //更新读取位置到消息批次最后一条消息的位移值+1
                              currOffset = batch.nextOffset
                              }

                              遍历消息集合中的批次对象,如果不是控制类的消息批次,执行下面的逻辑:

                              • 第一步:记录消息批次中第一条消息的偏移量

                              • 第二步:根据消息的 key,判断消息的类型:

                                • 如果是已提交位移消息,从 key 中或者(消费者组名称,订阅主题,分区号)三元组信息,然后判断是否有 value。如果没有,说明是墓碑消息,需要将该分区从已完成位移值加载的分区列表中移除;如果有,则将目标分区加入到已完成位移值加载的分区列表中。

                                • 如果是消费者组注册消息,先从 key 中获取消费者组名称,然后判断是否有 value。如果有,把该消费者组从待移除消费者组列表中移除,并加入到已完成加载的消费者组列表;如果没有,同样是墓碑消息,把该消费者组从已完成加载的消费者组列表中移除,并加入到待移除消费组列表。

                                • 如果是未知类型的 key,则直接抛异常

                              • 第三步:更新读取位置到消息批次的最后一条消息的偏移量 + 1

                              第三部分:处理四个集合
                                //对loadedOffsets 进行分组,将完成信息加载的组对应的消费者组位移值保存到groupOffsets
                                //将有消费者组消费位移,却没有消费者组信息的保存到emptyGroupOffsets
                                val (groupOffsets, emptyGroupOffsets) = loadedOffsets
                                //按照消费者组名进行分组
                                  .groupBy(_._1.group)
                                .mapValues(_.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition, offset) })
                                .partition { case (group, _) => loadedGroups.contains(group) }


                                val pendingOffsetsByGroup = mutable.Map[String, mutable.Map[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]]()
                                //属于事务范畴
                                pendingOffsets.foreach { case (producerId, producerOffsets) =>
                                producerOffsets.keySet.map(_.group).foreach(addProducerGroup(producerId, _))
                                producerOffsets
                                .groupBy(_._1.group)
                                .mapValues(_.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition, offset)})
                                .foreach { case (group, offsets) =>
                                val groupPendingOffsets = pendingOffsetsByGroup.getOrElseUpdate(group, mutable.Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
                                val groupProducerOffsets = groupPendingOffsets.getOrElseUpdate(producerId, mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
                                groupProducerOffsets ++= offsets
                                }
                                }


                                val (pendingGroupOffsets, pendingEmptyGroupOffsets) = pendingOffsetsByGroup
                                .partition { case (group, _) => loadedGroups.contains(group)}
                                //遍历完成信息加载的组的元数据
                                loadedGroups.values.foreach { group =>
                                //提取消费者组的已提交位移
                                val offsets = groupOffsets.getOrElse(group.groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
                                val pendingOffsets = pendingGroupOffsets.getOrElse(group.groupId, Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
                                debug(s"Loaded group metadata $group with offsets $offsets and pending offsets $pendingOffsets")
                                //为已完成加载的组执行加载组操作
                                loadGroup(group, offsets, pendingOffsets)
                                //为已完成加载的组执行加载组操作之后的逻辑
                                onGroupLoaded(group)
                                }


                                (emptyGroupOffsets.keySet ++ pendingEmptyGroupOffsets.keySet).foreach { groupId =>
                                val group = new GroupMetadata(groupId, Empty, time)
                                //创建空的消费者组元数据
                                val offsets = emptyGroupOffsets.getOrElse(groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
                                val pendingOffsets = pendingEmptyGroupOffsets.getOrElse(groupId, Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
                                debug(s"Loaded group metadata $group with offsets $offsets and pending offsets $pendingOffsets")
                                //为空的消费者组执行加载组操作
                                loadGroup(group, offsets, pendingOffsets)
                                //为空的消费者执行加载组操作之后的逻辑
                                onGroupLoaded(group)
                                }
                                //处理removedGroups,检查 removedGroups中的所有消费者组,确保它们不能出现在消费者组元数据缓存中,否则将抛出异常。
                                removedGroups.foreach { groupId =>
                                if (groupMetadataCache.contains(groupId) && !emptyGroupOffsets.contains(groupId))
                                throw new IllegalStateException(s"Unexpected unload of active group $groupId while " +
                                s"loading partition $topicPartition")
                                }
                                • 第一步:对 loadedOffsets 进行分组,将完成信息加载的组对应的消费者组位移值保存到groupOffsets;将有消费者组消费位移,却没有消费者组信息的保存到emptyGroupOffsets

                                • 第二步:遍历完成信息加载的消费者组元数据,提取已提交消费位移,调用 loadGroup 方法,将获取到的已提交消费位移更新到 GroupMetadata 组元数据的 offsets 变量中,同时将这些消费者组元数据添加到 GroupMetadataManager的 groupMetadataCache 变量中(调用 addGroup 方法)。之后执行上层调用传入的 onGroupLoaded 函数,其作用是处理消费者组下所有成员的心跳超时设置,并指定下一次心跳的超时时间。

                                • 第三步:为 emptyGroupOffsets 中的所有消费者组,创建空的消费者组元数据,然后执行和上一步相同的组加载逻辑以及加载后的逻辑。

                                • 第四步:遍历待移除的消费者组列表,确保它们不能出现在消费者组元数据缓存中,否则将抛出异常。

                                • pendingOffsets属于事务范畴,这里不进行分析

                                总结:

                                1. 为了解决强依赖 zk 进行 rebalance 操作带来的问题,和统一协调消费者组中所有消费者成员,引入了 Coordinator 机制。

                                2. GroupMetadataManager 组件管理了当前节点 GroupCoordinator 管辖的所有消费者组元数据,并定义了一些列方法供 GroupCoordinator 调用

                                3. GroupMetadataManager 定义的方法主要分为三大类:

                                • 管理消费者组元数据

                                • 管理消费者组消费位移

                                • 读取位移主题消息,填充消费者组元数据缓存

                                4. 消费者程序获取主题分区的消费位移时,并不会直接读取位移主题,而是读取消费者组元数据缓存中的信息。该信息由 GroupCoordinator 读取位移主题并填充元数据缓存

                                文章转载自大数据记事本,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                评论