消息中间件是每个做后台同学必须要掌握的一类框架,这主要取决于其广泛应用于互联网项目。消息中间件在这些系统中扮演着很重要的角色,它们的主要作用是消息异步,系统解耦,高并发削峰,分布式事务等等。目前主要的消息中间件有rabbitMQ、kafka、rocketMQ、ActiveMQ等,本系列文章总结的是kafka,也算是当前市面上比较流行的消息中间件,后续的文章会从kafka的生产者、消费者、broker等来总结。除了在实际应用中,消息中间件是一个常用的框架,在面试中,消息中间件也是必问内容。由于个人能力有限,文中难免有理解不到位的地方,还请留言指导,在此谢过。本系列文章kafka版本使用最新的2.8.0。
ConsumerPartitionAssignor
在上一篇文章中,我们介绍了消费者的重平衡流程,本节中,我们继续介绍重平衡相关内容--分配策略。如果对上一文内容还有印象的话,应该知道leader在接收到joinGroup的响应之后,会进行分区重新分配,将分配好的结果给coordinator。至于如何分配,上一文并没有说明,本文重点介绍一下分配的方法。分配过程存在两个协议,一个是EAGER,一个是COOPERATIVE。在2.4之前实现的都是EAGER协议,它的整个分配过程是一次的,存在消费者不能消费的情况(stop-the-world),为了解决这种情况,在2.4之后,增加了COOPERATIVE和static membership方案。COOPERATIVE协议是一个渐进重平衡过程,它是在一个不中断当前消费者消费的情况下逐步增加分区的过程。除了以上两者协议之外,还增加了一个配置项static membership来调节分配时间。static membership 是通过设置消费者超时时间来停止重平衡的过程,当消费者被设值为static membership的话,只要在规定时间(自定义30min以内)重新心跳就不会发生重平衡的过程。
EAGER过程:
这个就是上一节讲的内容,两个阶段:join group 和 synchronizing group。在join group的阶段,消费者会上报当前的消费分区,等待协调者返回分配结果。在这个过程中,消费者会放弃当前的分区,导致消息无法消费,这也会造成stop-the-world。
COOPERATIVE过程:
在join group的时候不会放弃当前分区,直接上报当前的消费分区。在这个过程中,消费者可以继续消费消息。当协调者获取到消费者上报的分区之后,对比元数据根据尽量保持上一次分配的原则将之前已经分配给消费者的分区发送下去,让消费者继续消费,这就完成了第一次重平衡过程。然后再将需要分配的分区重新分配给消费者,消费者接受到之后,增加消费分区达到第二次重平衡过程。
消费者重平衡分配策略是通过接口定义的:ConsumerPartitionAssignor,通过该接口我们可以看到这个接口的具体实现类如下:

从上面可以看到主要的实现方式有四种:Range,RoundRobin,CooperativeSticky,Sticky。我们先说明一下这几种分配策略是如何运转的。
Range:按照topic维度进行平均分配,将每个topic的partition按照顺序编排好,然后按照消费者数量开始平均分配。假如有一个topic,有m个分区,n个消费者。那么每个消费者至少消费m/n,前面的m%n的消费者多消费一个。eg:
两个topic,t1,t2分别有三个分区:t1p0,t1p1,t1p2;t2p0,t2p1,t2p2;并且有两个消费者c1,c2。
分配结果:c1[t1p0,t1p1,t2p0,t2p1],c2[t1p2,t2p2]。
RoundRobin:按照分区的维度进行平均分配,将所有的topic的分区数加起来,然后按照消费者数量平均分配。假如某个消费者组,共监听m个分区,n个消费者。那么每个消费者至少消费m/n,前面的m%n的消费者多消费一个。
eg:
两个topic,t1,t2分别有三个分区:t1p0,t1p1,t1p2;t2p0,t2p1,t2p2;并且有两个消费者c1,c2。
分配结果:c1[t1p0,t1p2,t2p1],c2[t1p1,t2p0,t2p2]。
这里有一个概念我们要提醒一下:kafka的分配是基于消费者组和组内成员得监听情况进行分配的。
eg:有三个成员c1,c2,c3,其中c1监听了t1,c2监听了t2,t1,c3监听了t1,t2,t3。其中t1有1个分区,t2有两个分区,t3有两个分区。
分配结果是:c1[t1p0],c2[t2p0],c3[t2p1,t3p0,t3p1]。
显然这个不是很均衡,我们可以将t2p1分配给c2。虽然是同一个消费者组,但是不管怎么reblance,c1都只会消费t1的消息。这个在实际开发中经常碰到,消费者都使用同一个消费者组名,然后T1和T2的消费逻辑是完全不同的,我们并不需要担心消费t1的消费者会被分配到消费t2上。
Sticky:这个称作粘性分配,其目的有两个:1)分区尽可能和上一次分配结果保持一致 2)分配尽可能均匀,每个消费者消费的分区数最多相差1。当两个目标发生冲突的时候,第一个目标优于第二个。
eg:我们有一个消费者,有三个成员c1,c2,c3,其中c1监听了t1,c2监听了t2,t1,c3监听了t1,t2,t3。其中t1有1个分区,t2有两个分区,t3有两个分区。
分配结果是:c1[t1p0],c2[t2p0,t2p1],c3[t3p0,t3p1]。
上一个例子说明的是第二个特性,再举个例子说明一下第一个特性,
eg:四个topic,t1,t2,t3,t4分别有两个分区:t1p0,t1p1;t2p0,t2p0;t3p0,t3p1;t4p0,t4p1,有三个消费者c1,c2,c3监听。
分配结果:c1[t1p0,t2p1,t4p0],c2[t1p1,t3p0,t4p1],c3[t2p0,t3p1]。
这个分配结果与RoundRobin的一致。不过当消费者c1退出消费者组的时候,RoundRobin的分配结果是:
c2[t1p0,t2p0,t3p0,t4p0]c3[t1p1,t2p1,t3p1,t4p1]。
而使用Sticky的分配结果是:
c2[t1p1,t3p0,t4p1,t2p1]c3[t2p0,t3p1,t1p0,t4p0]。
这个例子说明的是在粘性的分配机制下,会默认保留原来的分配方案,将需要变化的分区进行分配。
CooperativeSticky:这个的分配逻辑是和Sticky一致的,和前面分配逻辑不同的是,前面的都是EAGER协议,CooperativeSticky有两个协议,一个是EAGER,一个是COOPERATIVE。EAGER协议和上面的Sticky分配规则一样,先会放弃所有的分区,等待协调者返回重新分配的分区结果。COOPERATIVE协议就不一样了,它是一个渐进重平衡过程,这个过程可以允许消费者继续保留当前的分区不变化,然后等待协调者重新分配增量的分区。
我们通过一个例子说明一下COOPERATIVE协议下的分配方案:
Eg:两个消费者c1和c2,一个topic三个分区p1,p2,p3。刚开始分配结果是c1[p1,p2],c2[p3]。此时增加了一个消费者c3,按照Sticky分配的结果是:
c1[p1],c2[p3],c3[p2]。不过,在这个过程中,会存在两次重平衡的过程,c1,c2上报分区,获得的分配结果是c1[p1],c2[p3],这个时候p2没有人消费。第二次重平衡的过程是将p2分配给p3。
下面我们看下几个分配规则的源码实现:
Range:按照topic进行分配
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions);
Map<String, List<TopicPartition>> assignment = new HashMap<>();
for (String memberId : subscriptions.keySet())
assignment.put(memberId, new ArrayList<>());
//按照topic进行分配
for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) {
String topic = topicEntry.getKey();
List<MemberInfo> consumersForTopic = topicEntry.getValue();
Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
if (numPartitionsForTopic == null)
continue;
Collections.sort(consumersForTopic);
int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
//均衡分配原则
List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
assignment.get(consumersForTopic.get(i).memberId).addAll(partitions.subList(start, start + length));
}
}
return assignment;
}
RoundRobin:按照分区进行分配
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
Map<String, List<TopicPartition>> assignment = new HashMap<>();
List<MemberInfo> memberInfoList = new ArrayList<>();
for (Map.Entry<String, Subscription> memberSubscription : subscriptions.entrySet()) {
assignment.put(memberSubscription.getKey(), new ArrayList<>());
memberInfoList.add(new MemberInfo(memberSubscription.getKey(),
memberSubscription.getValue().groupInstanceId()));
}
CircularIterator<MemberInfo> assigner = new CircularIterator<>(Utils.sorted(memberInfoList));
//对所有的分区进行统一分配
for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {
final String topic = partition.topic();
while (!subscriptions.get(assigner.peek().memberId).topics().contains(topic))
assigner.next();
assignment.get(assigner.next().memberId).add(partition);
}
return assignment;
}
Sticky:该分配策略代码相对复杂,主要是实现在上一次分配的基础上,均衡分配。保证分区数相差小于等于1
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
Map<String, List<TopicPartition>> consumerToOwnedPartitions = new HashMap<>();
//所有消费者都监听了相同的topic
if (allSubscriptionsEqual(partitionsPerTopic.keySet(), subscriptions, consumerToOwnedPartitions)) {
log.debug("Detected that all consumers were subscribed to same set of topics, invoking the "
+ "optimized assignment algorithm");
partitionsTransferringOwnership = new HashMap<>();
return constrainedAssign(partitionsPerTopic, consumerToOwnedPartitions);
} else {
//监听了不同的topic
log.debug("Detected that all not consumers were subscribed to same set of topics, falling back to the "
+ "general case assignment algorithm");
partitionsTransferringOwnership = null;
return generalAssign(partitionsPerTopic, subscriptions);
}
}
//相同topic的分配方案
private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer> partitionsPerTopic,
Map<String, List<TopicPartition>> consumerToOwnedPartitions) {
//初始化未分配的分区
SortedSet<TopicPartition> unassignedPartitions = getTopicPartitions(partitionsPerTopic);
Set<TopicPartition> allRevokedPartitions = new HashSet<>();
//未填满的消费者
List<String> unfilledMembers = new LinkedList<>();
// 分配了最大分区数的消费者
Queue<String> maxCapacityMembers = new LinkedList<>();
//分配了最小分区数的消费者
Queue<String> minCapacityMembers = new LinkedList<>();
int numberOfConsumers = consumerToOwnedPartitions.size();
//获取消费者消费分区的最大值和最小值,两种相差小于等于1
int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers);
int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers);
// 初始化分配结果
Map<String, List<TopicPartition>> assignment = new HashMap<>(
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota))));
// 针对每个消费者,尽可能按照之前的分配方式分配到最大值
for (Map.Entry<String, List<TopicPartition>> consumerEntry : consumerToOwnedPartitions.entrySet()) {
String consumer = consumerEntry.getKey();
List<TopicPartition> ownedPartitions = consumerEntry.getValue();
List<TopicPartition> consumerAssignment = assignment.get(consumer);
int i = 0;
// 分配到最大数
for (TopicPartition tp : ownedPartitions) {
if (i < maxQuota) {
consumerAssignment.add(tp);
unassignedPartitions.remove(tp);
} else {
allRevokedPartitions.add(tp);
}
++i;
}
//如果还没有最小的分区数那么多,就添加到未填满的消费者集合中
if (ownedPartitions.size() < minQuota) {
unfilledMembers.add(consumer);
} else {
//已经分配最小数量
if (consumerAssignment.size() == minQuota)
minCapacityMembers.add(consumer);
//已经分配最大数量
if (consumerAssignment.size() == maxQuota)
maxCapacityMembers.add(consumer);
}
}
Collections.sort(unfilledMembers);
Iterator<TopicPartition> unassignedPartitionsIter = unassignedPartitions.iterator();
// 将未填满的消费者填满到最小值
while (!unfilledMembers.isEmpty() && !unassignedPartitions.isEmpty()) {
Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
while (unfilledConsumerIter.hasNext()) {
String consumer = unfilledConsumerIter.next();
List<TopicPartition> consumerAssignment = assignment.get(consumer);
if (unassignedPartitionsIter.hasNext()) {
TopicPartition tp = unassignedPartitionsIter.next();
consumerAssignment.add(tp);
unassignedPartitionsIter.remove();
// We already assigned all possible ownedPartitions, so we know this must be newly to this consumer
if (allRevokedPartitions.contains(tp))
partitionsTransferringOwnership.put(tp, consumer);
} else {
break;
}
if (consumerAssignment.size() == minQuota) {
minCapacityMembers.add(consumer);
unfilledConsumerIter.remove();
}
}
}
//如果没填满的消费者举例最小分配数的还有很多,从最大分配数中取出分区转移给这些消费者,达到一个最小分配数
for (String consumer : unfilledMembers) {
List<TopicPartition> consumerAssignment = assignment.get(consumer);
int remainingCapacity = minQuota - consumerAssignment.size();
while (remainingCapacity > 0) {
String overloadedConsumer = maxCapacityMembers.poll();
if (overloadedConsumer == null) {
throw new IllegalStateException("Some consumers are under capacity but all partitions have been assigned");
}
TopicPartition swappedPartition = assignment.get(overloadedConsumer).remove(0);
consumerAssignment.add(swappedPartition);
--remainingCapacity;
// This partition is by definition transferring ownership, the swapped partition must have come from
// the max capacity member's owned partitions since it can only reach max capacity with owned partitions
partitionsTransferringOwnership.put(swappedPartition, consumer);
}
minCapacityMembers.add(consumer);
}
//将未分配的分区往最小的分区里面分配
for (TopicPartition unassignedPartition : unassignedPartitions) {
String underCapacityConsumer = minCapacityMembers.poll();
if (underCapacityConsumer == null) {
throw new IllegalStateException("Some partitions are unassigned but all consumers are at maximum capacity");
}
// We can skip the bookkeeping of unassignedPartitions and maxCapacityMembers here since we are at the end
assignment.get(underCapacityConsumer).add(unassignedPartition);
if (allRevokedPartitions.contains(unassignedPartition))
partitionsTransferringOwnership.put(unassignedPartition, underCapacityConsumer);
}
return assignment;
}
private Map<String, List<TopicPartition>> generalAssign(Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
Map<String, List<TopicPartition>> currentAssignment = new HashMap<>();
Map<TopicPartition, ConsumerGenerationPair> prevAssignment = new HashMap<>();
partitionMovements = new PartitionMovements();
prepopulateCurrentAssignments(subscriptions, currentAssignment, prevAssignment);
// partitionConsumer map 存储
final Map<TopicPartition, List<String>> partition2AllPotentialConsumers = new HashMap<>();
// consumerPartition map 存储
final Map<String, List<TopicPartition>> consumer2AllPotentialPartitions = new HashMap<>();
// 初始化partition2AllPotentialConsumers
for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) {
for (int i = 0; i < entry.getValue(); ++i)
partition2AllPotentialConsumers.put(new TopicPartition(entry.getKey(), i), new ArrayList<>());
}
//初始化consumer2AllPotentialPartitions
for (Entry<String, Subscription> entry: subscriptions.entrySet()) {
String consumerId = entry.getKey();
consumer2AllPotentialPartitions.put(consumerId, new ArrayList<>());
entry.getValue().topics().stream().filter(topic -> partitionsPerTopic.get(topic) != null).forEach(topic -> {
for (int i = 0; i < partitionsPerTopic.get(topic); ++i) {
TopicPartition topicPartition = new TopicPartition(topic, i);
consumer2AllPotentialPartitions.get(consumerId).add(topicPartition);
partition2AllPotentialConsumers.get(topicPartition).add(consumerId);
}
});
// add this consumer to currentAssignment (with an empty topic partition assignment) if it does not already exist
if (!currentAssignment.containsKey(consumerId))
currentAssignment.put(consumerId, new ArrayList<>());
}
// 存入当前的partition Consumer
Map<TopicPartition, String> currentPartitionConsumer = new HashMap<>();
for (Map.Entry<String, List<TopicPartition>> entry: currentAssignment.entrySet())
for (TopicPartition topicPartition: entry.getValue())
currentPartitionConsumer.put(topicPartition, entry.getKey());
List<TopicPartition> sortedPartitions = sortPartitions(partition2AllPotentialConsumers);
List<TopicPartition> unassignedPartitions = new ArrayList<>(sortedPartitions);
boolean revocationRequired = false;
//将currentPartitionConsumer unassignedPartitions等前置准备
for (Iterator<Entry<String, List<TopicPartition>>> it = currentAssignment.entrySet().iterator(); it.hasNext();) {
Map.Entry<String, List<TopicPartition>> entry = it.next();
if (!subscriptions.containsKey(entry.getKey())) {
// 之前消费者存在,现在不存在了
for (TopicPartition topicPartition: entry.getValue())
currentPartitionConsumer.remove(topicPartition);
it.remove();
} else {
// 当前消费者还存在
for (Iterator<TopicPartition> partitionIter = entry.getValue().iterator(); partitionIter.hasNext();) {
TopicPartition partition = partitionIter.next();
if (!partition2AllPotentialConsumers.containsKey(partition)) {
// if this topic partition of this consumer no longer exists remove it from currentAssignment of the consumer
partitionIter.remove();
currentPartitionConsumer.remove(partition);
} else if (!subscriptions.get(entry.getKey()).topics().contains(partition.topic())) {
// if this partition cannot remain assigned to its current consumer because the consumer
// is no longer subscribed to its topic remove it from currentAssignment of the consumer
partitionIter.remove();
revocationRequired = true;
} else
// otherwise, remove the topic partition from those that need to be assigned only if
// its current consumer is still subscribed to its topic (because it is already assigned
// and we would want to preserve that assignment as much as possible)
unassignedPartitions.remove(partition);
}
}
}
// at this point we have preserved all valid topic partition to consumer assignments and removed
// all invalid topic partitions and invalid consumers. Now we need to assign unassignedPartitions
// to consumers so that the topic partition assignments are as balanced as possible.
// an ascending sorted set of consumers based on how many topic partitions are already assigned to them
TreeSet<String> sortedCurrentSubscriptions = new TreeSet<>(new SubscriptionComparator(currentAssignment));
sortedCurrentSubscriptions.addAll(currentAssignment.keySet());
//分配函数 :分配过程分为初次分配,和再次均衡分配
balance(currentAssignment, prevAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions,
consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer, revocationRequired);
return currentAssignment;
}
本文主要介绍了消费者分配策略,分配策略有三种,Range,RoundRobin,Sticky。Range,RoundRobin是两个原始的分配方案,分别是按照topic、partition分配,Sticky是一种粘性分配方案,主要是为了和上一次的分配尽可能保持一致,并且保证分区数不超过1。不过,这些分配方案都会产生stop-the-world的问题。所以sticky又增加了一个渐进分配的过程:CooperativeSticky。CooperativeSticky的诞生是分多次重平衡过程,减少某个topic全部不消费的问题,进而解决stop-the-world问题。
本文的内容就这么多,如果你觉得对你的学习和面试有些帮助,帮忙点个赞或者转发一下哈,谢谢。




