暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

深入理解Kafka服务端之消费者组元数据的管理

大数据记事本 2020-12-26
876
一、场景分析
    从这篇开始,分析 Kafka 消费相关的组件和原理。Kafka 消费主题的数据是以消费者组为单位的,同一个消费者组中的多个消费者同时消费同一主题的不同分区,以此来提高整体的消费能力。
    在服务端,存储了消费者组和组内消费者成员的元数据信息,以及对主题分区的消费进度等。这篇主要分析存储了哪些元数据信息,以及如何对元数据进行管理。
二、图示说明
服务端对消费者组元数据的管理主要分为以下四个方面:

三、源码分析

1. 服务端存储了消费者组的哪些元数据?

存储元数据相关的类有主要有两个:
  • GroupMetadata:存储消费者组的元数据
  • MemberMetadata:存储消费者组中成员的元数据
文件结构:
MemberMetadata.scala 文件定义了三个类和对象
  • MemberSummary 类:组成员概要数据,提取了最核心的元数据信息。上面例子中工具行命令返回的结果,就是这个类提供的数据。
  • MemberMetadata 伴生对象:仅仅定义了一个工具方法,供上层组件调用。
  • MemberMetadata 类:消费者组成员的元数据。Kafka 为消费者组成员定义了很多数据,一会儿我们将会详细学习。
MemberSummary 类:是一个样例类,仅仅定义了组成员的元数据种类
    case class MemberSummary(memberId: String,// 成员ID,由Kafka自动生成
    groupInstanceId: Option[String],// Consumer端参数group.instance.id值
    clientId: String,// client.id参数值
    clientHost: String,// Consumer端程序主机名
    metadata: Array[Byte],// 消费者组成员使用的分配策略
    assignment: Array[Byte])// 成员订阅分区
    其中各个属性的定义如下:
    • memberId:标识向消费者组成员ID,由 Kafka 自动生成,规则是 consumer- 组 ID-< 序号 >-
    • groupInstanceId:消费者组静态成员的 ID。静态成员机制的引入能够规避不必要的消费者组 Rebalance 操作。属于高阶功能,可以看官网 group.instance.id 参数的说明。
    • clientId:消费者组成员配置的 client.id 参数。由于 memberId 不能被设置,可以在创建消费者实例时指定这个参数来标识消费者组成员。
    • clientHost:运行消费者程序的主机名。它记录了这个客户端是从哪台机器发出的消费请求。
    • metadata:标识消费者组成员分区分配策略的字节数组,由消费者端参数 partition.assignment.strategy 值设定,默认的 RangeAssignor 策略是按照主题平均分配分区。
    • assignment:保存分配给该消费者成员的订阅分区。每个消费者组都要选出一个 Leader 消费者组成员,负责给所有成员分配消费方案。之后,Kafka 将制定好的分配方案序列化成字节数组,赋值给 assignment,分发给各个成员。

    MemberMetadata 伴生对象

      private object MemberMetadata {
      // 提取分区分配策略集合
      def plainProtocolSet(supportedProtocols: List[(String, Array[Byte])]) = supportedProtocols.map(_._1).toSet
      }
      该伴生对象只定义了一个方法,用来提取分区分配策略的集合。
      假设消费者组下有 3 个成员,它们的分区分配策略设置分别为 [ A ]、[ A,B ]、[ B ],那么该方法返回 [ A,B ]。也就是该消费者组定义的所有分区分配策略的集合。
      MemberMetadata  类
        private[group] class MemberMetadata(var memberId: String,
        val groupId: String,
        val groupInstanceId: Option[String],
        val clientId: String,
        val clientHost: String,
        val rebalanceTimeoutMs: Int,// Rebalane操作超时时间
        val sessionTimeoutMs: Int,// 会话超时时间
        val protocolType: String,// 对消费者组而言,是"consumer"
        var supportedProtocols: List[(String, Array[Byte])]// 成员配置的多套分区分配策略
        ) {
        // 分区分配方案
        var assignment: Array[Byte] = Array.empty[Byte]
        //表示组成员是否正在等待加入组。
        var awaitingJoinCallback: JoinGroupResult => Unit = null
        //表示组成员是否正在等待 GroupCoordinator 发送分配方案。
        var awaitingSyncCallback: SyncGroupResult => Unit = null
        //表示组成员是否发起“退出组”的操作。
        var isLeaving: Boolean = false
        //表示是否是消费者组下的新成员。
        var isNew: Boolean = false
        val isStaticMember: Boolean = groupInstanceId.isDefined
        //用于标识心跳是否完成,当接收到心跳时为true,当超时未接到到心跳,设置为false
        var heartbeatSatisfied: Boolean = false
        ...
        }
        相比于 MemberSummary  类的定义,该类在定义时多了几个属性,含义如下:
        • rebalanceTimeoutMs:Rebalance 操作的超时时间,即一次 Rebalance 操作必须在这个时间内完成,否则被视为超时。这个字段的值是 Consumer 端参数 max.poll.interval.ms 的值。
        • sessionTimeoutMs:会话超时时间。当前消费者组成员依靠心跳机制“保活”。如果在会话超时时间之内未能成功发送心跳,组成员就被判定成“下线”,从而触发新一轮的 Rebalance。这个字段的值是 Consumer 端参数 session.timeout.ms 的值。
        • protocolType:协议类型。它实际上标识的是消费者组被用在了哪个场景。这里的场景具体有两个:第一个是作为普通的消费者组使用,该字段对应的值就是 consumer;第二个是供 Kafka Connect 组件中的消费者使用,该字段对应的值是 connect。
        • supportedProtocols:标识成员配置的多组分区分配策略。目前,Consumer 端参数 partition.assignment.strategy 的类型是 List,说明可以为消费者组成员设置多组分配策略,因此,这个字段也是一个 List 类型,每个元素是一个元组(Tuple)。元组的第一个元素是策略名称,第二个元素是序列化后的策略详情。

        除此之外,还定义了几个重要的变量:

        • assignment:保存分配给该成员的分区分配方案。
        • awaitingJoinCallback:表示组成员是否正在等待加入组。
        • awaitingSyncCallback:表示组成员是否正在等待 GroupCoordinator 发送分配方案。
        • isLeaving:表示组成员是否发起“退出组”的操作。
        • isNew:表示是否是消费者组下的新成员。
        • heartbeatSatisfied:标识心跳是否完成,当接收到心跳时为true,当超时未接到到心跳,设置为false

        GroupMetadata.scala 文件由6个部分组成

        • GroupState 特质及实现对象:定义了消费者组的状态。当前消费者组有 5 个状态:
          • Empty:表示当前无成员的消费者组
          • PreparingRebalance:表示正在执行加入组操作的消费者组
          • CompletingRebalance:表示等待 Leader 成员制定分配方案的消费者组
          • Stable:表示已完成 Rebalance 操作可正常工作的消费者组
          • Dead:表示当前无成员且元数据信息被删除的消费者组。
        • GroupMetadata 类:组元数据类。该 scala 文件下最重要的类文件。
        • GroupMetadata 伴生对象:该对象定义了 loadGroup 方法用于创建 GroupMetadata 实例,并定义了用于存储消费者组合法前置状态的变量 validPreviousStates
        • GroupOverview 类:定义了非常简略的消费者组概览信息。
        • GroupSummary 类:与 MemberSummary 类类似,它定义了消费者组的概要信息。
        • CommitRecordMetadataAndOffset 类:保存写入到位移主题中的消息的位移值,以及其他元数据信息。这个类的主要职责就是保存位移值。
        GroupState 及其实现对象:
          private[group] sealed trait GroupState


          private[group] case object PreparingRebalance extends GroupState


          ...
          定义都十分简单。
              每个实现对象的注释中,都指定了该状态可以转换到的下一个状态及转换的时机,以 PreparingRebalance 状态为例,其注释中 transition 的内容如下:
            /**
            * transition: some members have joined by the timeout => CompletingRebalance
            * all members have left the group => Empty
            * group is removed by partition emigration => Dead
            */

            这里说明了 PreparingRebalance 状态转换的时机如下:

            • 当有成员加入时,转换为 CompletingRebalance 状态

            • 当所有组成员退出时,转换为 Empty 状态

            • 当消费者组被移除时,转换为 Dead 状态

                一个消费者组从创建到正常工作,它的状态流转路径是:Empty -> PreparingRebalance -> CompletingRebalance -> Stable。

            GroupOverview 类:

                定义了消费者组的概览信息,比如执行 kafka-consumer-groups.sh --list 的时候,Kafka 就会创建 GroupOverview 实例返回给命令行。

              case class GroupOverview(groupId: String,//消费者组id,即:group.id 参数的值
              protocolType: String//协议类型
              )

              GroupSummary 类:

              同样定义了消费者组的概览信息:

                case class GroupSummary(state: String,//消费者组状态
                protocolType: String,//协议类型
                protocol: String,//消费者组选定的分区分配策略
                members: List[MemberSummary]//消费者组成员元数据
                )

                GroupMetadata 伴生对象

                  private object GroupMetadata {
                  //用于存储各种状态的合法前置状态
                  private val validPreviousStates: Map[GroupState, Set[GroupState]] =
                  Map(Dead -> Set(Stable, PreparingRebalance, CompletingRebalance, Empty, Dead),
                  CompletingRebalance -> Set(PreparingRebalance),
                  Stable -> Set(CompletingRebalance),
                  PreparingRebalance -> Set(Stable, CompletingRebalance, Empty),
                  Empty -> Set(PreparingRebalance))
                  ...
                  }

                  validPreviousStates:用于存储消费者组各个状态的合法前置状态。key 是状态,value 是该状态对应的合法前置状态的集合

                  GroupMetadata 类:

                    private[group] class GroupMetadata(val groupId: String, //消费者组id
                    initialState: GroupState, //消费者组初始状态
                    time: Time) extends Logging {
                    type JoinCallback = JoinGroupResult => Unit


                    private[group] val lock = new ReentrantLock
                    // 组状态
                    private var state: GroupState = initialState
                    // 记录状态最近一次变更的时间戳
                    var currentStateTimestamp: Option[Long] = Some(time.milliseconds())
                    //协议类型
                    var protocolType: Option[String] = None
                    //记录rebalance的次数
                    var generationId = 0
                    // 记录消费者组的Leader成员,可能不存在
                    private var leaderId: Option[String] = None
                    //分区分配策略
                    private var protocol: Option[String] = None
                    // 成员元数据列表信息
                    private val members = new mutable.HashMap[String, MemberMetadata]
                    // Static membership mapping [key: group.instance.id, value: member.id]
                    // 静态成员Id列表
                    private val staticMembers = new mutable.HashMap[String, String]
                    private val pendingMembers = new mutable.HashSet[String]
                    private var numMembersAwaitingJoin = 0
                    // 分区分配策略支持票数
                    private val supportedProtocols = new mutable.HashMap[String, Integer]().withDefaultValue(0)
                    // 保存消费者组订阅分区的提交位移值
                    private val offsets = new mutable.HashMap[TopicPartition, CommitRecordMetadataAndOffset]
                    private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata]
                    private val pendingTransactionalOffsetCommits = new mutable.HashMap[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]()
                    private var receivedTransactionalOffsetCommits = false
                    private var receivedConsumerOffsetCommits = false


                    var newMemberAdded: Boolean = false
                    ...
                    }
                    其中几个重要变量的含义如下:
                    • currentStateTimestamp:记录最近一次状态变更的时间戳,用于确定位移主题中的过期消息。位移主题中的消息也要遵循 Kafka 的留存策略,所有当前时间与该字段的差值超过了留存阈值的消息都被视为“已过期”(Expired)。
                    • generationId:消费者组 Generation 号。Generation 等同于消费者组执行过 Rebalance 操作的次数,每次执行 Rebalance 时,Generation 数都要加 1。
                    • leaderId:消费者组中 Leader 成员的 Member ID 信息。当消费者组执行 Rebalance 过程时,需要选举一个成员作为 Leader,负责为所有成员制定分区分配方案。在 Rebalance 早期阶段,这个 Leader 可能尚未被选举出来。
                    • members:保存消费者组下所有成员的元数据信息。Key 是成员的 member ID 字段,Value 是 MemberMetadata 类型,保存了成员的元数据信息。
                    • offsets:保存按照主题分区分组的位移主题消息位移值的 HashMap。Key 是主题分区,Value 是前面讲过的 CommitRecordMetadataAndOffset 类型。当消费者组成员向 Kafka 提交位移时,会向这个字段插入对应的记录。
                    • supportedProtocols:保存分区分配策略的支持票数。HashMap 类型,Key 是分配策略的名称,Value 是支持的票数。每个成员可以选择多个分区分配策略,因此,假设成员 A 选择[“range”,“round-robin”]、B 选择[“range”]、C 选择[“round-robin”,“sticky”],那么这个字段就有 3 项,分别是:<“range”,2>、<“round-robin”,2> 和 <“sticky”,1>。
                    2. 服务端如何管理消费者组相关的元数据
                    2.1对消费者组状态的管理
                      //判断消费者组的当前状态是否为指定状态
                      def is(groupState: GroupState) = state == groupState


                      //判断消费者组的当前状态不是指定状态
                      def not(groupState: GroupState) = state != groupState


                      //获取消费者组当前状态
                      def currentState = state


                      //消费者组能否Rebalance的条件是当前状态是PreparingRebalance状态的合法前置状态
                      def canRebalance = GroupMetadata.validPreviousStates(PreparingRebalance).contains(state)


                      //设置/更新消费者组状态
                      def transitionTo(groupState: GroupState) {
                      //验证目标状态的合法性
                      assertValidTransition(groupState)
                      //更新/设置消费者组状态
                      state = groupState
                      //设置更新状态的时间戳
                      currentStateTimestamp = Some(time.milliseconds())
                      }
                      • transitionTo 方法:作用是将组状态变更成给定状态。在变更前,需要验证目标状态的合法性,依据就是看 GroupMetadata 伴生对象中定义的各个状态的合法前置状态,如果当前状态是目标状态的前置状态,则视为合法的状态转换。
                        然后更新状态变更的时间戳,Kafka 有个定时任务,会定期清除过期的消费者组位移数据,它就是依靠这个时间戳字段,来判断过期与否的。
                      • canRebalance 方法:用于判断组是否能够开启 Rebalance 操作。判断依据是,当前状态是否是 PreparingRebalance 状态的合法前置状态。只有 Stable、CompletingRebalance 和 Empty 这 3 类状态的消费者组,才有资格开启 Rebalance。
                      • is 和 not 方法:分别判断组的状态与给定状态是否一致,主要被用于执行状态校验。
                      2.2 对消费者组成员的管理
                          对成员的管理分为添加成员(add 方法)、移除成员(remove 方法)和查询成员(has、get、size 方法等),这里主要操作的就是 members 变量:
                      添加成员:add
                        def add(member: MemberMetadata, callback: JoinCallback = null) {
                        //如果members为空,说明添加的是第一个消费者
                        if (members.isEmpty)
                        // 就把该成员的protocolType设置为消费者组的protocolType
                        this.protocolType = Some(member.protocolType)
                        // 确保成员元数据中的groupId和组Id相同
                        assert(groupId == member.groupId)
                        // 确保成员元数据中的protoclType和组protocolType相同
                        assert(this.protocolType.orNull == member.protocolType)
                        // 确保该成员选定的分区分配策略与组选定的分区分配策略相匹配
                        assert(supportsProtocols(member.protocolType, MemberMetadata.plainProtocolSet(member.supportedProtocols)))
                        // 如果尚未选出Leader成员
                        if (leaderId.isEmpty)
                        // 把该成员设定为Leader成员
                        leaderId = Some(member.memberId)
                        // 将该成员添加进members
                        members.put(member.memberId, member)
                        // 更新分区分配策略支持票数
                        member.supportedProtocols.foreach{ case (protocol, _) => supportedProtocols(protocol) += 1 }
                        // 设置成员加入组后的回调逻辑
                        member.awaitingJoinCallback = callback
                        // 更新已加入组的成员数
                        if (member.isAwaitingJoin)
                        numMembersAwaitingJoin += 1
                        }
                        该方法的逻辑是:
                        • 第一步:判断当前组是否为空,如果为空,说明待添加的成员是该组的第一个成员,则把该成员的protocolType 设置为组的 protocolType。对于普通的消费者,protocolType 就是字符串"consumer"。
                        • 第二步:验证待加入成员的元数据的合法性,包括 groupId 组ID、protocolType 以及该成员选定的分区分配策略是否与组选定的分区分配策略相匹配。
                        • 第三步:判断是否选出 Leader 成员,如果没有则把待加入成员设置为 Leader 成员。这里的 Leader 成员,是指消费者组下的一个成员。该成员负责为所有组成员制定分区分配方案,分配依据就是消费者组选定的分区分配策略。
                        • 第四步:将该成员加入组,即添加到 members 变量中
                        • 第五步:更新消费者组分区分配策略支持的票数。
                        • 第六步:设置成员加入组后的回调逻辑
                        • 第七步:更新已经加入组的成员的数量

                        移除成员:remove

                          def remove(memberId: String) {
                          //第一步:将成员从 members 变量中移除
                          members.remove(memberId).foreach { member =>
                          //第二步:更新分区分配策略支持票数
                          member.supportedProtocols.foreach{ case (protocol, _) => supportedProtocols(protocol) -= 1 }
                          //第三步:更新组中成员数量
                          if (member.isAwaitingJoin)
                          numMembersAwaitingJoin -= 1
                          }
                          //第四步:如果该成员是Leader,则重选Leader成员,策略就是选剩余成员列表中的第一个为Leader
                          if (isLeader(memberId))
                          leaderId = members.keys.headOption
                          }
                          该方法的逻辑相对简单:
                          • 第一步:将成员从 members 变量中移除
                          • 第二步:更新分区分配策略支持票数
                          • 第三步:更新组中成员数量
                          • 第四步:如果该成员是Leader,则重选Leader成员,策略就是选剩余成员列表中的第一个为Leader
                          查询成员:has、get、size 等
                            //判断给定成员是否在组中
                            def has(memberId: String) = members.contains(memberId)
                            //根据成员ID获取成员
                            def get(memberId: String) = members(memberId)
                            //或者组成员的数量
                            def size = members.size

                            2.3 偏移量(位移)管理

                                管理消费者组的提交位移(Committed Offsets),包括添加和移除位移,主要操作的就是 GroupMetadata.offsets 变量
                              / 保存消费者组订阅分区的提交位移值
                              private val offsets = new mutable.HashMap[TopicPartition, CommitRecordMetadataAndOffset]

                                  该变量的 Key 是 TopicPartition 类型,表示一个主题分区, Value 是 CommitRecordMetadataAndOffset 类型。该类封装了位移提交消息的位移值。

                                case class CommitRecordMetadataAndOffset(appendedBatchOffset: Option[Long], offsetAndMetadata: OffsetAndMetadata) {
                                def olderThan(that: CommitRecordMetadataAndOffset): Boolean = appendedBatchOffset.get < that.appendedBatchOffset.get
                                }
                                其包含两个属性:
                                • appendedBatchOffset:保存的是位移主题消息自己的位移值
                                • offsetAndMetadata:保存的是位移提交消息中保存的消费者组的位移值
                                什么叫位移提交消息?
                                    Kafka 中消费者组会向 Coordinator 提交已经消费消息的进度,这个进度称为提交位移,标识的是消费者组要消费的下一条消息的偏移量。之后这个提交位移会被封装成一个消息,保存在内部主题_consumer_offsets 中。位移提交消息就是封装之后进行存储的消息对象。内部主题和普通主题一样,会给每条消息分配一个偏移量(或者叫位移值),这条消息的偏移量就是 appendedBatchOffset 的值,而 offsetAndMetadata 指的是封装成消息中的消费者组的消费进度。

                                添加位移值

                                    在 GroupMetadata 中,有 3 个向 offsets 中添加订阅分区的已消费位移值的方法,分别如下:
                                ① initializeOffsets
                                  def initializeOffsets(offsets: collection.Map[TopicPartition, CommitRecordMetadataAndOffset],
                                  pendingTxnOffsets: Map[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]) {
                                  //将给定的一组订阅分区提交位移值加到 offsets 中
                                  this.offsets ++= offsets
                                  //更新 pendingTransactionalOffsetCommits
                                  this.pendingTransactionalOffsetCommits ++= pendingTxnOffsets
                                  }

                                      当消费者组的协调者组件启动时,它会创建一个异步任务,定期地读取位移主题(_consumer_offsets)中相应消费者组的提交位移数据,并把它们加载到 offsets 字段中

                                  ② onOffsetCommitAppend

                                    def onOffsetCommitAppend(topicPartition: TopicPartition, offsetWithCommitRecordMetadata: CommitRecordMetadataAndOffset) {
                                    if (pendingOffsetCommits.contains(topicPartition)) {
                                    if (offsetWithCommitRecordMetadata.appendedBatchOffset.isEmpty)
                                    throw new IllegalStateException("Cannot complete offset commit write without providing the metadata of the record " +
                                    "in the log.")
                                    // offsets字段中没有该分区位移提交数据,或者
                                    // offsets字段中该分区对应的提交位移消息在位移主题中的位移值小于待写入的位移值
                                    if (!offsets.contains(topicPartition) || offsets(topicPartition).olderThan(offsetWithCommitRecordMetadata))
                                    // 将该分区对应的提交位移消息添加到offsets中
                                    offsets.put(topicPartition, offsetWithCommitRecordMetadata)
                                    }


                                    pendingOffsetCommits.get(topicPartition) match {
                                    case Some(stagedOffset) if offsetWithCommitRecordMetadata.offsetAndMetadata == stagedOffset =>
                                    pendingOffsetCommits.remove(topicPartition)
                                    case _ =>
                                    // The pendingOffsetCommits for this partition could be empty if the topic was deleted, in which case
                                    // its entries would be removed from the cache by the `removeOffsets` method.
                                    }
                                    }

                                        该方法用于将提交位移消息的位移值写入 offsets 变量。执行写入的判断依据是:

                                    • offsets 中不包含该主题分区对应的消息值

                                    • offsets 中该分区对应的提交位移消息在位移主题中的位移值小于待写入的位移值。

                                        如果是的话,就把该主题已提交位移消息的位移值添加到 offsets 中。

                                    ③ completePendingTxnOffsetCommit

                                        该方法的作用是完成一个待决事务(Pending Transaction)的位移提交。这里暂时不做分析

                                    移除位移值

                                        由于 _consumer_offsets 主题会将存储时间超过阈值的消息进行清理,即当前时间与已提交位移消息的时间戳的差值,超过了 Broker 端参数 offsets.retention.minutes 值,那么该消息会被清理,同时要移除 offsets 中该消息对应的数据。

                                        GroupMetadata.removeExpiredOffsets 方法就是用来移除 offsets 中已清理提交位移消息的数据的,其逻辑如下:

                                      def removeExpiredOffsets(currentTimestamp: Long, offsetRetentionMs: Long): Map[TopicPartition, OffsetAndMetadata] = {
                                      //用于获取订阅分区过期的位移值
                                      def getExpiredOffsets(baseTimestamp: CommitRecordMetadataAndOffset => Long): Map[TopicPartition, OffsetAndMetadata] = {
                                      // 遍历offsets中的所有分区,过滤出同时满足以下2个条件的分区
                                      // 条件1:该主题分区已经完成位移提交
                                      // 条件2:该主题分区在位移主题中对应消息的存在时间超过了阈值
                                      offsets.filter {
                                      case (topicPartition, commitRecordMetadataAndOffset) =>
                                      !pendingOffsetCommits.contains(topicPartition) && {
                                      //获取位移消息中的时间戳
                                      commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestamp match {
                                      //如果没有时间戳
                                      case None =>
                                      // current version with no per partition retention
                                      currentTimestamp - baseTimestamp(commitRecordMetadataAndOffset) >= offsetRetentionMs
                                      //如果获取到了时间戳
                                      case Some(expireTimestamp) =>


                                      //判断条件就是方法传入的currentTimestamp大于等于消息中的时间戳
                                      currentTimestamp >= expireTimestamp
                                      }
                                      }
                                      }.map {
                                      case (topicPartition, commitRecordOffsetAndMetadata) =>
                                      //将位移值对象返回
                                      (topicPartition, commitRecordOffsetAndMetadata.offsetAndMetadata)
                                      }.toMap
                                      }
                                      // 调用getExpiredOffsets方法获取主题分区的过期位移
                                      val expiredOffsets: Map[TopicPartition, OffsetAndMetadata] = protocolType match {
                                      case Some(_) if is(Empty) =>
                                      //传入更新为当前组状态的时间戳
                                      getExpiredOffsets(commitRecordMetadataAndOffset =>
                                      currentStateTimestamp.getOrElse(commitRecordMetadataAndOffset.offsetAndMetadata.commitTimestamp))
                                      //如果没有定义协议类型
                                      case None =>


                                      getExpiredOffsets(_.offsetAndMetadata.commitTimestamp)


                                      case _ =>
                                      Map()
                                      }


                                      if (expiredOffsets.nonEmpty)
                                      debug(s"Expired offsets from group '$groupId': ${expiredOffsets.keySet}")
                                      //将过期位移对应的主题分区从offsets中移除
                                      offsets --= expiredOffsets.keySet
                                      //返回主题分区对应的过期位移
                                      expiredOffsets
                                      }
                                      • 第一步:定义了一个内部方法 getExpiredOffsets 用于获取订阅主题分区过期的位移值。过期位移的判定条件有两个,需要同时满足:
                                        • 该主题分区已经完成位移提交,即该主题分区不在 pendingOffsetCommits 结构中
                                        • 该主题分区在位移主题中对应消息的存在时间超过了阈值。这里会获取消息中存储的过期时间戳,如果获取到了,直接判断当前时间戳是否大于等于消息中的过期时间戳。如果没有获取到,则获取当前时间和当前组状态的更新时间的差值,判断其是否大于等于 offsets.retention.minutes 的值。
                                      • 第二步:根据 protocolType 和组状态,向第一步定义的方法中传入对应的参数,来获取主题分区的过期位移
                                      • 如果消费者组状态是 Empty,就传入组变更为 Empty 状态的时间,若该时间没有被记录,则使用提交位移消息本身的写入时间戳,来获取过期位移
                                      • 如果 protocolType 为 None,就表示,这个消费者组其实是一个 Standalone 消费者,依然使用提交位移消息本身的写入时间戳,来决定过期位移值
                                      • 如果消费者组的状态不符合刚刚说的这些情况,那就说明,没有过期位移值需要被移除。
                                      • 第三步:将过期位移对应的主题分区从 offsets 中移除
                                      • 第四步:返回主题分区对应的过期位移

                                      2.4分区分配策略的管理

                                          由于每个消费者都可以设置自己的分区分配策略(可以指定多个),而且多个消费者设置的分配策略不一定相同,那么对于消费者组来说,如果确定分区分配策略呢?
                                          这里就用到了 supportedProtocols 变量,上面提到这个变量的作用是记录分区分配策略的票数,该票数在成员数量发生变化时也会进行相应的调整
                                      GroupMetadata 类中和确定分区分配策略相关的方法有:
                                      ① candidateProtocols:找出组内所有成员都支持的分区分配策略集
                                        private def candidateProtocols = {
                                        //找出支持票数=总成员数的策略,返回它们的名称
                                        val numMembers = members.size
                                        supportedProtocols.filter(_._2 == numMembers).map(_._1).toSet
                                        }

                                        该方法会先获取组的成员个数,然后获取票数和成员个数一致的分区分配策略,也就是所有的成员都支持的分配策略

                                        ② selectProtocol:用于投票选举出公认的分区分配策略

                                          def selectProtocol: String = {
                                          //如果没有成员,抛出异常
                                          if (members.isEmpty)
                                          throw new IllegalStateException("Cannot select protocol for empty group")


                                          // select the protocol for this group which is supported by all members
                                          //获取所有成员都支持的分配策略
                                          val candidates = candidateProtocols


                                          // let each member vote for one of the protocols and choose the one with the most votes
                                          //让每个成员投票,票数最多的那个策略当选
                                          val votes: List[(String, Int)] = allMemberMetadata
                                          .map(_.vote(candidates))
                                          .groupBy(identity)
                                          .mapValues(_.size)
                                          .toList


                                          votes.maxBy(_._2)._1
                                          }
                                          该方法的逻辑是:
                                          • 第一步:判断组中是否有成员,如果没有则直接抛异常
                                          • 第二步:调用上面的 candidateProtocols 获取所有成员都支持的分区分配策略
                                          • 第三步:让每个成员对分区分配策略进行投票,票数最多的当选
                                          其中投票的 vote 方法的实现如下:
                                            def vote(candidates: Set[String]): String = {
                                            //找到当前成员配置的分区分配策略中,第一个在给定的分区分配策略中的
                                            supportedProtocols.find({ case (protocol, _) => candidates.contains(protocol)}) match {
                                            case Some((protocol, _)) => protocol
                                            case None =>
                                            throw new IllegalArgumentException("Member does not support any of the candidate protocols")
                                            }
                                            }
                                            其投票的原理是:找到当前成员配置的分区分配策略中,第一个在给定的分区分配策略集合中的。假设给定的分区分配策略集合为 [ A,B,C ],当前成员配置的分配策略为 [ B,C ],那么就投 B 策略一票;而如果当前成员配置的分配策略为 [ C,B ],就投 C 策略一票。
                                            然后按照投票结果进行分组,投票最多的分配策略当选

                                            总结:

                                            1. 服务端存储消费者组元数据的类主要有两个:
                                            • GroupMetadata:存储消费者组的元数据
                                            • MemberMetadata:存储消费者组中成员的元数据
                                            2. 消费者组元数据主要包括:
                                            • currentStateTimestamp:最近状态变更的时间戳
                                            • generationId:执行过 Rebalance 操作的次数
                                            • leaderId:消费者组中 Leader 成员的 Member ID 信息
                                            • members:保存消费者组下所有成员的元数据信息。
                                            • offsets:保存主题分区和位移主题消息位移值的对应关系
                                            • supportedProtocols:保存消费者组中分区分配策略的投票数
                                            3. 组成员的元数据主要包括:
                                            • memberId:标识消费者组成员ID
                                            • groupInstanceId:消费者组静态成员的 ID
                                            • clientId:消费者组成员配置的 client.id 参数
                                            • clientHost:运行消费者程序的主机名
                                            • metadata:消费者组成员分区分配策略的字节数组
                                            • assignment:保存分配给该消费者成员的订阅分区
                                            • rebalanceTimeoutMs:Rebalance 操作的超时时间
                                            • sessionTimeoutMs:会话超时时间
                                            • protocolType:协议类型
                                            • supportedProtocols:成员配置的多组分区分配策略
                                            4.服务端对消费者组元数据的管理主要分四个方面:
                                            • 对消费者组状态的管理
                                            • 对消费者组成员的管理
                                            • 对消费位移的管理
                                            • 对分区分配策略的管理

                                            参考:极客时间《Kafka核心源码解读》

                                            文章转载自大数据记事本,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                            评论