
三、源码分析
1. 服务端存储了消费者组的哪些元数据?
GroupMetadata:存储消费者组的元数据 MemberMetadata:存储消费者组中成员的元数据
MemberSummary 类:组成员概要数据,提取了最核心的元数据信息。上面例子中工具行命令返回的结果,就是这个类提供的数据。 MemberMetadata 伴生对象:仅仅定义了一个工具方法,供上层组件调用。 MemberMetadata 类:消费者组成员的元数据。Kafka 为消费者组成员定义了很多数据,一会儿我们将会详细学习。
case class MemberSummary(memberId: String,// 成员ID,由Kafka自动生成groupInstanceId: Option[String],// Consumer端参数group.instance.id值clientId: String,// client.id参数值clientHost: String,// Consumer端程序主机名metadata: Array[Byte],// 消费者组成员使用的分配策略assignment: Array[Byte])// 成员订阅分区
memberId:标识向消费者组成员ID,由 Kafka 自动生成,规则是 consumer- 组 ID-< 序号 >- groupInstanceId:消费者组静态成员的 ID。静态成员机制的引入能够规避不必要的消费者组 Rebalance 操作。属于高阶功能,可以看官网 group.instance.id 参数的说明。 clientId:消费者组成员配置的 client.id 参数。由于 memberId 不能被设置,可以在创建消费者实例时指定这个参数来标识消费者组成员。 clientHost:运行消费者程序的主机名。它记录了这个客户端是从哪台机器发出的消费请求。 metadata:标识消费者组成员分区分配策略的字节数组,由消费者端参数 partition.assignment.strategy 值设定,默认的 RangeAssignor 策略是按照主题平均分配分区。 assignment:保存分配给该消费者成员的订阅分区。每个消费者组都要选出一个 Leader 消费者组成员,负责给所有成员分配消费方案。之后,Kafka 将制定好的分配方案序列化成字节数组,赋值给 assignment,分发给各个成员。
MemberMetadata 伴生对象
private object MemberMetadata {// 提取分区分配策略集合def plainProtocolSet(supportedProtocols: List[(String, Array[Byte])]) = supportedProtocols.map(_._1).toSet}
private[group] class MemberMetadata(var memberId: String,val groupId: String,val groupInstanceId: Option[String],val clientId: String,val clientHost: String,val rebalanceTimeoutMs: Int,// Rebalane操作超时时间val sessionTimeoutMs: Int,// 会话超时时间val protocolType: String,// 对消费者组而言,是"consumer"var supportedProtocols: List[(String, Array[Byte])]// 成员配置的多套分区分配策略) {// 分区分配方案var assignment: Array[Byte] = Array.empty[Byte]//表示组成员是否正在等待加入组。var awaitingJoinCallback: JoinGroupResult => Unit = null//表示组成员是否正在等待 GroupCoordinator 发送分配方案。var awaitingSyncCallback: SyncGroupResult => Unit = null//表示组成员是否发起“退出组”的操作。var isLeaving: Boolean = false//表示是否是消费者组下的新成员。var isNew: Boolean = falseval isStaticMember: Boolean = groupInstanceId.isDefined//用于标识心跳是否完成,当接收到心跳时为true,当超时未接到到心跳,设置为falsevar heartbeatSatisfied: Boolean = false...}
rebalanceTimeoutMs:Rebalance 操作的超时时间,即一次 Rebalance 操作必须在这个时间内完成,否则被视为超时。这个字段的值是 Consumer 端参数 max.poll.interval.ms 的值。 sessionTimeoutMs:会话超时时间。当前消费者组成员依靠心跳机制“保活”。如果在会话超时时间之内未能成功发送心跳,组成员就被判定成“下线”,从而触发新一轮的 Rebalance。这个字段的值是 Consumer 端参数 session.timeout.ms 的值。 protocolType:协议类型。它实际上标识的是消费者组被用在了哪个场景。这里的场景具体有两个:第一个是作为普通的消费者组使用,该字段对应的值就是 consumer;第二个是供 Kafka Connect 组件中的消费者使用,该字段对应的值是 connect。 supportedProtocols:标识成员配置的多组分区分配策略。目前,Consumer 端参数 partition.assignment.strategy 的类型是 List,说明可以为消费者组成员设置多组分配策略,因此,这个字段也是一个 List 类型,每个元素是一个元组(Tuple)。元组的第一个元素是策略名称,第二个元素是序列化后的策略详情。
除此之外,还定义了几个重要的变量:
assignment:保存分配给该成员的分区分配方案。 awaitingJoinCallback:表示组成员是否正在等待加入组。 awaitingSyncCallback:表示组成员是否正在等待 GroupCoordinator 发送分配方案。 isLeaving:表示组成员是否发起“退出组”的操作。 isNew:表示是否是消费者组下的新成员。 heartbeatSatisfied:标识心跳是否完成,当接收到心跳时为true,当超时未接到到心跳,设置为false
GroupMetadata.scala 文件由6个部分组成
GroupState 特质及实现对象:定义了消费者组的状态。当前消费者组有 5 个状态: Empty:表示当前无成员的消费者组 PreparingRebalance:表示正在执行加入组操作的消费者组 CompletingRebalance:表示等待 Leader 成员制定分配方案的消费者组 Stable:表示已完成 Rebalance 操作可正常工作的消费者组 Dead:表示当前无成员且元数据信息被删除的消费者组。 GroupMetadata 类:组元数据类。该 scala 文件下最重要的类文件。 GroupMetadata 伴生对象:该对象定义了 loadGroup 方法用于创建 GroupMetadata 实例,并定义了用于存储消费者组合法前置状态的变量 validPreviousStates GroupOverview 类:定义了非常简略的消费者组概览信息。 GroupSummary 类:与 MemberSummary 类类似,它定义了消费者组的概要信息。 CommitRecordMetadataAndOffset 类:保存写入到位移主题中的消息的位移值,以及其他元数据信息。这个类的主要职责就是保存位移值。
private[group] sealed trait GroupStateprivate[group] case object PreparingRebalance extends GroupState...
/*** transition: some members have joined by the timeout => CompletingRebalance* all members have left the group => Empty* group is removed by partition emigration => Dead*/
这里说明了 PreparingRebalance 状态转换的时机如下:
当有成员加入时,转换为 CompletingRebalance 状态
当所有组成员退出时,转换为 Empty 状态
当消费者组被移除时,转换为 Dead 状态
一个消费者组从创建到正常工作,它的状态流转路径是:Empty -> PreparingRebalance -> CompletingRebalance -> Stable。
GroupOverview 类:
定义了消费者组的概览信息,比如执行 kafka-consumer-groups.sh --list 的时候,Kafka 就会创建 GroupOverview 实例返回给命令行。
case class GroupOverview(groupId: String,//消费者组id,即:group.id 参数的值protocolType: String//协议类型)
GroupSummary 类:
同样定义了消费者组的概览信息:
case class GroupSummary(state: String,//消费者组状态protocolType: String,//协议类型protocol: String,//消费者组选定的分区分配策略members: List[MemberSummary]//消费者组成员元数据)
GroupMetadata 伴生对象:
private object GroupMetadata {//用于存储各种状态的合法前置状态private val validPreviousStates: Map[GroupState, Set[GroupState]] =Map(Dead -> Set(Stable, PreparingRebalance, CompletingRebalance, Empty, Dead),CompletingRebalance -> Set(PreparingRebalance),Stable -> Set(CompletingRebalance),PreparingRebalance -> Set(Stable, CompletingRebalance, Empty),Empty -> Set(PreparingRebalance))...}
validPreviousStates:用于存储消费者组各个状态的合法前置状态。key 是状态,value 是该状态对应的合法前置状态的集合
GroupMetadata 类:
private[group] class GroupMetadata(val groupId: String, //消费者组idinitialState: GroupState, //消费者组初始状态time: Time) extends Logging {type JoinCallback = JoinGroupResult => Unitprivate[group] val lock = new ReentrantLock// 组状态private var state: GroupState = initialState// 记录状态最近一次变更的时间戳var currentStateTimestamp: Option[Long] = Some(time.milliseconds())//协议类型var protocolType: Option[String] = None//记录rebalance的次数var generationId = 0// 记录消费者组的Leader成员,可能不存在private var leaderId: Option[String] = None//分区分配策略private var protocol: Option[String] = None// 成员元数据列表信息private val members = new mutable.HashMap[String, MemberMetadata]// Static membership mapping [key: group.instance.id, value: member.id]// 静态成员Id列表private val staticMembers = new mutable.HashMap[String, String]private val pendingMembers = new mutable.HashSet[String]private var numMembersAwaitingJoin = 0// 分区分配策略支持票数private val supportedProtocols = new mutable.HashMap[String, Integer]().withDefaultValue(0)// 保存消费者组订阅分区的提交位移值private val offsets = new mutable.HashMap[TopicPartition, CommitRecordMetadataAndOffset]private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata]private val pendingTransactionalOffsetCommits = new mutable.HashMap[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]()private var receivedTransactionalOffsetCommits = falseprivate var receivedConsumerOffsetCommits = falsevar newMemberAdded: Boolean = false...}
currentStateTimestamp:记录最近一次状态变更的时间戳,用于确定位移主题中的过期消息。位移主题中的消息也要遵循 Kafka 的留存策略,所有当前时间与该字段的差值超过了留存阈值的消息都被视为“已过期”(Expired)。 generationId:消费者组 Generation 号。Generation 等同于消费者组执行过 Rebalance 操作的次数,每次执行 Rebalance 时,Generation 数都要加 1。 leaderId:消费者组中 Leader 成员的 Member ID 信息。当消费者组执行 Rebalance 过程时,需要选举一个成员作为 Leader,负责为所有成员制定分区分配方案。在 Rebalance 早期阶段,这个 Leader 可能尚未被选举出来。 members:保存消费者组下所有成员的元数据信息。Key 是成员的 member ID 字段,Value 是 MemberMetadata 类型,保存了成员的元数据信息。 offsets:保存按照主题分区分组的位移主题消息位移值的 HashMap。Key 是主题分区,Value 是前面讲过的 CommitRecordMetadataAndOffset 类型。当消费者组成员向 Kafka 提交位移时,会向这个字段插入对应的记录。 supportedProtocols:保存分区分配策略的支持票数。HashMap 类型,Key 是分配策略的名称,Value 是支持的票数。每个成员可以选择多个分区分配策略,因此,假设成员 A 选择[“range”,“round-robin”]、B 选择[“range”]、C 选择[“round-robin”,“sticky”],那么这个字段就有 3 项,分别是:<“range”,2>、<“round-robin”,2> 和 <“sticky”,1>。
//判断消费者组的当前状态是否为指定状态def is(groupState: GroupState) = state == groupState//判断消费者组的当前状态不是指定状态def not(groupState: GroupState) = state != groupState//获取消费者组当前状态def currentState = state//消费者组能否Rebalance的条件是当前状态是PreparingRebalance状态的合法前置状态def canRebalance = GroupMetadata.validPreviousStates(PreparingRebalance).contains(state)//设置/更新消费者组状态def transitionTo(groupState: GroupState) {//验证目标状态的合法性assertValidTransition(groupState)//更新/设置消费者组状态state = groupState//设置更新状态的时间戳currentStateTimestamp = Some(time.milliseconds())}
transitionTo 方法:作用是将组状态变更成给定状态。在变更前,需要验证目标状态的合法性,依据就是看 GroupMetadata 伴生对象中定义的各个状态的合法前置状态,如果当前状态是目标状态的前置状态,则视为合法的状态转换。 然后更新状态变更的时间戳,Kafka 有个定时任务,会定期清除过期的消费者组位移数据,它就是依靠这个时间戳字段,来判断过期与否的。 canRebalance 方法:用于判断组是否能够开启 Rebalance 操作。判断依据是,当前状态是否是 PreparingRebalance 状态的合法前置状态。只有 Stable、CompletingRebalance 和 Empty 这 3 类状态的消费者组,才有资格开启 Rebalance。 is 和 not 方法:分别判断组的状态与给定状态是否一致,主要被用于执行状态校验。
def add(member: MemberMetadata, callback: JoinCallback = null) {//如果members为空,说明添加的是第一个消费者if (members.isEmpty)// 就把该成员的protocolType设置为消费者组的protocolTypethis.protocolType = Some(member.protocolType)// 确保成员元数据中的groupId和组Id相同assert(groupId == member.groupId)// 确保成员元数据中的protoclType和组protocolType相同assert(this.protocolType.orNull == member.protocolType)// 确保该成员选定的分区分配策略与组选定的分区分配策略相匹配assert(supportsProtocols(member.protocolType, MemberMetadata.plainProtocolSet(member.supportedProtocols)))// 如果尚未选出Leader成员if (leaderId.isEmpty)// 把该成员设定为Leader成员leaderId = Some(member.memberId)// 将该成员添加进membersmembers.put(member.memberId, member)// 更新分区分配策略支持票数member.supportedProtocols.foreach{ case (protocol, _) => supportedProtocols(protocol) += 1 }// 设置成员加入组后的回调逻辑member.awaitingJoinCallback = callback// 更新已加入组的成员数if (member.isAwaitingJoin)numMembersAwaitingJoin += 1}
第一步:判断当前组是否为空,如果为空,说明待添加的成员是该组的第一个成员,则把该成员的protocolType 设置为组的 protocolType。对于普通的消费者,protocolType 就是字符串"consumer"。 第二步:验证待加入成员的元数据的合法性,包括 groupId 组ID、protocolType 以及该成员选定的分区分配策略是否与组选定的分区分配策略相匹配。 第三步:判断是否选出 Leader 成员,如果没有则把待加入成员设置为 Leader 成员。这里的 Leader 成员,是指消费者组下的一个成员。该成员负责为所有组成员制定分区分配方案,分配依据就是消费者组选定的分区分配策略。 第四步:将该成员加入组,即添加到 members 变量中 第五步:更新消费者组分区分配策略支持的票数。 第六步:设置成员加入组后的回调逻辑 第七步:更新已经加入组的成员的数量
移除成员:remove
def remove(memberId: String) {//第一步:将成员从 members 变量中移除members.remove(memberId).foreach { member =>//第二步:更新分区分配策略支持票数member.supportedProtocols.foreach{ case (protocol, _) => supportedProtocols(protocol) -= 1 }//第三步:更新组中成员数量if (member.isAwaitingJoin)numMembersAwaitingJoin -= 1}//第四步:如果该成员是Leader,则重选Leader成员,策略就是选剩余成员列表中的第一个为Leaderif (isLeader(memberId))leaderId = members.keys.headOption}
第一步:将成员从 members 变量中移除 第二步:更新分区分配策略支持票数 第三步:更新组中成员数量 第四步:如果该成员是Leader,则重选Leader成员,策略就是选剩余成员列表中的第一个为Leader
//判断给定成员是否在组中def has(memberId: String) = members.contains(memberId)//根据成员ID获取成员def get(memberId: String) = members(memberId)//或者组成员的数量def size = members.size
2.3 偏移量(位移)管理
/ 保存消费者组订阅分区的提交位移值private val offsets = new mutable.HashMap[TopicPartition, CommitRecordMetadataAndOffset]
该变量的 Key 是 TopicPartition 类型,表示一个主题分区, Value 是 CommitRecordMetadataAndOffset 类型。该类封装了位移提交消息的位移值。
case class CommitRecordMetadataAndOffset(appendedBatchOffset: Option[Long], offsetAndMetadata: OffsetAndMetadata) {def olderThan(that: CommitRecordMetadataAndOffset): Boolean = appendedBatchOffset.get < that.appendedBatchOffset.get}
appendedBatchOffset:保存的是位移主题消息自己的位移值 offsetAndMetadata:保存的是位移提交消息中保存的消费者组的位移值
添加位移值
def initializeOffsets(offsets: collection.Map[TopicPartition, CommitRecordMetadataAndOffset],pendingTxnOffsets: Map[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]) {//将给定的一组订阅分区提交位移值加到 offsets 中this.offsets ++= offsets//更新 pendingTransactionalOffsetCommitsthis.pendingTransactionalOffsetCommits ++= pendingTxnOffsets}
当消费者组的协调者组件启动时,它会创建一个异步任务,定期地读取位移主题(_consumer_offsets)中相应消费者组的提交位移数据,并把它们加载到 offsets 字段中
② onOffsetCommitAppend
def onOffsetCommitAppend(topicPartition: TopicPartition, offsetWithCommitRecordMetadata: CommitRecordMetadataAndOffset) {if (pendingOffsetCommits.contains(topicPartition)) {if (offsetWithCommitRecordMetadata.appendedBatchOffset.isEmpty)throw new IllegalStateException("Cannot complete offset commit write without providing the metadata of the record " +"in the log.")// offsets字段中没有该分区位移提交数据,或者// offsets字段中该分区对应的提交位移消息在位移主题中的位移值小于待写入的位移值if (!offsets.contains(topicPartition) || offsets(topicPartition).olderThan(offsetWithCommitRecordMetadata))// 将该分区对应的提交位移消息添加到offsets中offsets.put(topicPartition, offsetWithCommitRecordMetadata)}pendingOffsetCommits.get(topicPartition) match {case Some(stagedOffset) if offsetWithCommitRecordMetadata.offsetAndMetadata == stagedOffset =>pendingOffsetCommits.remove(topicPartition)case _ =>// The pendingOffsetCommits for this partition could be empty if the topic was deleted, in which case// its entries would be removed from the cache by the `removeOffsets` method.}}
该方法用于将提交位移消息的位移值写入 offsets 变量。执行写入的判断依据是:
offsets 中不包含该主题分区对应的消息值
offsets 中该分区对应的提交位移消息在位移主题中的位移值小于待写入的位移值。
如果是的话,就把该主题已提交位移消息的位移值添加到 offsets 中。
③ completePendingTxnOffsetCommit
该方法的作用是完成一个待决事务(Pending Transaction)的位移提交。这里暂时不做分析
由于 _consumer_offsets 主题会将存储时间超过阈值的消息进行清理,即当前时间与已提交位移消息的时间戳的差值,超过了 Broker 端参数 offsets.retention.minutes 值,那么该消息会被清理,同时要移除 offsets 中该消息对应的数据。
GroupMetadata.removeExpiredOffsets 方法就是用来移除 offsets 中已清理提交位移消息的数据的,其逻辑如下:
def removeExpiredOffsets(currentTimestamp: Long, offsetRetentionMs: Long): Map[TopicPartition, OffsetAndMetadata] = {//用于获取订阅分区过期的位移值def getExpiredOffsets(baseTimestamp: CommitRecordMetadataAndOffset => Long): Map[TopicPartition, OffsetAndMetadata] = {// 遍历offsets中的所有分区,过滤出同时满足以下2个条件的分区// 条件1:该主题分区已经完成位移提交// 条件2:该主题分区在位移主题中对应消息的存在时间超过了阈值offsets.filter {case (topicPartition, commitRecordMetadataAndOffset) =>!pendingOffsetCommits.contains(topicPartition) && {//获取位移消息中的时间戳commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestamp match {//如果没有时间戳case None =>// current version with no per partition retentioncurrentTimestamp - baseTimestamp(commitRecordMetadataAndOffset) >= offsetRetentionMs//如果获取到了时间戳case Some(expireTimestamp) =>//判断条件就是方法传入的currentTimestamp大于等于消息中的时间戳currentTimestamp >= expireTimestamp}}}.map {case (topicPartition, commitRecordOffsetAndMetadata) =>//将位移值对象返回(topicPartition, commitRecordOffsetAndMetadata.offsetAndMetadata)}.toMap}// 调用getExpiredOffsets方法获取主题分区的过期位移val expiredOffsets: Map[TopicPartition, OffsetAndMetadata] = protocolType match {case Some(_) if is(Empty) =>//传入更新为当前组状态的时间戳getExpiredOffsets(commitRecordMetadataAndOffset =>currentStateTimestamp.getOrElse(commitRecordMetadataAndOffset.offsetAndMetadata.commitTimestamp))//如果没有定义协议类型case None =>getExpiredOffsets(_.offsetAndMetadata.commitTimestamp)case _ =>Map()}if (expiredOffsets.nonEmpty)debug(s"Expired offsets from group '$groupId': ${expiredOffsets.keySet}")//将过期位移对应的主题分区从offsets中移除offsets --= expiredOffsets.keySet//返回主题分区对应的过期位移expiredOffsets}
第一步:定义了一个内部方法 getExpiredOffsets 用于获取订阅主题分区过期的位移值。过期位移的判定条件有两个,需要同时满足: 该主题分区已经完成位移提交,即该主题分区不在 pendingOffsetCommits 结构中 该主题分区在位移主题中对应消息的存在时间超过了阈值。这里会获取消息中存储的过期时间戳,如果获取到了,直接判断当前时间戳是否大于等于消息中的过期时间戳。如果没有获取到,则获取当前时间和当前组状态的更新时间的差值,判断其是否大于等于 offsets.retention.minutes 的值。 第二步:根据 protocolType 和组状态,向第一步定义的方法中传入对应的参数,来获取主题分区的过期位移 如果消费者组状态是 Empty,就传入组变更为 Empty 状态的时间,若该时间没有被记录,则使用提交位移消息本身的写入时间戳,来获取过期位移 如果 protocolType 为 None,就表示,这个消费者组其实是一个 Standalone 消费者,依然使用提交位移消息本身的写入时间戳,来决定过期位移值 如果消费者组的状态不符合刚刚说的这些情况,那就说明,没有过期位移值需要被移除。 第三步:将过期位移对应的主题分区从 offsets 中移除 第四步:返回主题分区对应的过期位移
2.4分区分配策略的管理
private def candidateProtocols = {//找出支持票数=总成员数的策略,返回它们的名称val numMembers = members.sizesupportedProtocols.filter(_._2 == numMembers).map(_._1).toSet}
该方法会先获取组的成员个数,然后获取票数和成员个数一致的分区分配策略,也就是所有的成员都支持的分配策略
② selectProtocol:用于投票选举出公认的分区分配策略
def selectProtocol: String = {//如果没有成员,抛出异常if (members.isEmpty)throw new IllegalStateException("Cannot select protocol for empty group")// select the protocol for this group which is supported by all members//获取所有成员都支持的分配策略val candidates = candidateProtocols// let each member vote for one of the protocols and choose the one with the most votes//让每个成员投票,票数最多的那个策略当选val votes: List[(String, Int)] = allMemberMetadata.map(_.vote(candidates)).groupBy(identity).mapValues(_.size).toListvotes.maxBy(_._2)._1}
第一步:判断组中是否有成员,如果没有则直接抛异常 第二步:调用上面的 candidateProtocols 获取所有成员都支持的分区分配策略 第三步:让每个成员对分区分配策略进行投票,票数最多的当选
def vote(candidates: Set[String]): String = {//找到当前成员配置的分区分配策略中,第一个在给定的分区分配策略中的supportedProtocols.find({ case (protocol, _) => candidates.contains(protocol)}) match {case Some((protocol, _)) => protocolcase None =>throw new IllegalArgumentException("Member does not support any of the candidate protocols")}}
总结:
GroupMetadata:存储消费者组的元数据 MemberMetadata:存储消费者组中成员的元数据
currentStateTimestamp:最近状态变更的时间戳 generationId:执行过 Rebalance 操作的次数 leaderId:消费者组中 Leader 成员的 Member ID 信息 members:保存消费者组下所有成员的元数据信息。 offsets:保存主题分区和位移主题消息位移值的对应关系 supportedProtocols:保存消费者组中分区分配策略的投票数
memberId:标识消费者组成员ID groupInstanceId:消费者组静态成员的 ID clientId:消费者组成员配置的 client.id 参数 clientHost:运行消费者程序的主机名 metadata:消费者组成员分区分配策略的字节数组 assignment:保存分配给该消费者成员的订阅分区 rebalanceTimeoutMs:Rebalance 操作的超时时间 sessionTimeoutMs:会话超时时间 protocolType:协议类型 supportedProtocols:成员配置的多组分区分配策略
对消费者组状态的管理 对消费者组成员的管理 对消费位移的管理 对分区分配策略的管理
参考:极客时间《Kafka核心源码解读》




