消息中间件是每个做后台同学必须要掌握的一类框架,这主要取决于其广泛应用于互联网项目。消息中间件在这些系统中扮演着很重要的角色,它们的主要作用是消息异步,系统解耦,高并发削峰,分布式事务等等。目前主要的消息中间件有rabbitMQ、kafka、rocketMQ、ActiveMQ等,本系列文章总结的是kafka,也算是当前市面上比较流行的消息中间件,后续的文章会从kafka的生产者、消费者、broker等来总结。除了在实际应用中,消息中间件是一个常用的框架,在面试中,消息中间件也是必问内容。由于个人能力有限,文中难免有理解不到位的地方,还请留言指导,在此谢过。本系列文章kafka版本使用最新的2.8.0。
消费者消费模式
消费者在启动的过程,需要去监听topic的消息,kafka提供了两种模式:订阅(subscribe)和分配(assign)。这两种模式的主要差别体现在是否需要协调者动态分配分区。订阅模式是消费者只指定订阅的topic,由协调者统一分配分区,这么做有两个好处:
1)消费者消费partition的数量相对均衡,保证每个分区的消费速度大致一样;
2)消费者如果出现异常,心跳断连,那么会有新的消费者来消费该分区;
第二点对于一个高可用的系统来说至关重要,我们不能因为某一台机器出现问题导致整个集群出现问题。assign模式是指定对应的消费分区,该分区只能由该消费者来消费,这种做法在某些特定的场景下是比较有作用的。那是不是意味着我们不需要assign模式?那倒也不是,flink不就使用的是assign模式么。只能说在绝大部分的场景下我们都使用订阅模式,非要使用分配模式,请慎重。
消费者订阅模式(subscribe)
上面已经介绍了订阅模式和分配模式的区别,现在我们来重点看下其核心的实现逻辑。
KafkaConsumer#subscribe:
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
//加锁
acquireAndEnsureOpen();
try {
//前面各种异常检查,包含groupId,topics,协调者,值得注意的是如果topics为空的话,就当做调用unsubscribe来处理
//...
//清空本地缓存之前的topic分配的数据
fetcher.clearBufferedDataForUnassignedTopics(topics);
//如果订阅成功
if (this.subscriptions.subscribe(new HashSet<>(topics), listener))
//更新元数据
metadata.requestUpdateForNewTopics();
//...
} finally {
//释放锁
release();
}
}
上面的逻辑比较简单,也就几个步骤:
1) 加锁
2) 分配前check,这里会check多项,包括topic,groupId,协调器;之前我们提到了在需要进行reblance的时候,都是协调者进行分配的,所以这里要check一下。如果topic为空的话,这里会进行反订阅操作,会把之前所有订阅的topic清空
3) 进行订阅topic
4) 订阅成功,更新元数据
5) 释放锁
加锁和释放锁的过程
上面的加锁和释放锁的过程和我们平时看到的加锁不一样,它并非使用的是lock,而是使用一种无锁操作的原子更新器来实现。
KafkaConsumer#acquire和KafkaConsumer#release:
//设置的初始值为无线程
private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
//设置为原子整型
private final AtomicInteger refcount = new AtomicInteger(0);
//加锁
private void acquire() {
long threadId = Thread.currentThread().getId();
//如果当前线程id和执行的线程是一个;设置无锁为当前线程
if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
//次数加1
refcount.incrementAndGet();
}
//释放锁
private void release() {
//如果为0的话,就设置为无锁
if (refcount.decrementAndGet() == 0)
currentThread.set(NO_CURRENT_THREAD);
}
整个加锁的过程是:
1) 获取当前的线程id
2) 判断是否加锁成功,判断有两种情况,第一种:如果获取的执行线程和之前存入的线程id是一样的,加锁成功,这种情况是一种可重复锁的思维。如果加锁成功,加锁计数加1。第二种:如果设置初始值为当前线程,加锁成功,这种是对于没有线程加锁的情况,先cas设置加锁。
3) 前面的加锁成功之后,加锁计数加1。
上面的实现本身是一个轻量级的可重入锁的实现过程,这就要求每个消费者最好使用一个单独的线程执行操作。如果在使用多消费者的情况下,请确保每个消费者是一个单独的线程在执行。释放锁的过程简单,看下代码即可。有时候我们在开发的过程中,要确保某件事只有一个线程进行执行的话,上面是一个很不错的思路,无需加锁,非常轻量级。
本地缓存数据清理
清空本地缓存的topic主要是将之前还在订阅的,但是新监听的topic不在订阅的数据进行清理,清理的数据有两部分:completedFetches和nextInLineFetch,completedFetches是从服务器拉取下来的消息,nextInLineFetch是准备给消费者的消息,这两个都是CompletedFetch类型,中间需要有层转换操作(这个在后续拉取消息的流程中进行详细说明)。具体的实现逻辑如下:
Fetcher#clearBufferedDataForUnassignedTopics
public void clearBufferedDataForUnassignedTopics(Collection<String> assignedTopics) {
//找出待分配的topic中目前已经订阅了的topic
Set<TopicPartition> currentTopicPartitions = new HashSet<>();
for (TopicPartition tp : subscriptions.assignedPartitions()) {
if (assignedTopics.contains(tp.topic())) {
currentTopicPartitions.add(tp);
}
}
//将接下来还要继续订阅的topic数据保留,其他的清理
clearBufferedDataForUnassignedPartitions(currentTopicPartitions);
}
public void clearBufferedDataForUnassignedPartitions(Collection<TopicPartition> assignedPartitions) {
//获取已经拉取的消息,completedFetches 保存的是从服务器拉下来的消息
Iterator<CompletedFetch> completedFetchesItr = completedFetches.iterator();
while (completedFetchesItr.hasNext()) {
CompletedFetch records = completedFetchesItr.next();
TopicPartition tp = records.partition;
//如果不包含在订阅的topic中,消息清理
if (!assignedPartitions.contains(tp)) {
records.drain();
completedFetchesItr.remove();
}
}
//nextInLineFetch 是给消费者获取的消息,当nextInLineFetch还存在其他topic的,进行清理
if (nextInLineFetch != null && !assignedPartitions.contains(nextInLineFetch.partition)) {
nextInLineFetch.drain();
nextInLineFetch = null;
}
}
订阅topic
订阅topic主要实现的逻辑是:注册监听器,设置监听模式,修改订阅topic数据,如果topic没有发生改变,那么就不更新topic的相关数据,如果发生改变,置needPartialUpdate标志位为true,等待下一轮的更新。具体的实现逻辑如下:
#SubscriptionState.subscribe
//监听topic,返回是否发生改变
public synchronized boolean subscribe(Set<String> topics, ConsumerRebalanceListener listener) {
registerRebalanceListener(listener);
setSubscriptionType(SubscriptionType.AUTO_TOPICS);
return changeSubscription(topics);
}
//进行对比
private boolean changeSubscription(Set<String> topicsToSubscribe) {
if (subscription.equals(topicsToSubscribe))
return false;
subscription = topicsToSubscribe;
return true;
}
//下次请求设置为立即更新
public synchronized int requestUpdateForNewTopics() {
// Override the timestamp of last refresh to let immediate update.
this.lastRefreshMs = 0;
this.needPartialUpdate = true;
this.requestVersion++;
return this.updateVersion;
}
除了上面使用传入的topic,kafka还支持正则匹配的传入topic,监听的逻辑和上面类似,有兴趣的同学可以去看下。
分配模式(assign)
和上面的订阅模式不同,分配模式需要传入的是topic和partition,指定消费者消费哪个分区,之后也不会在发生重平衡等操作。分配模式的实现逻辑和订阅模式都部分类似,主要的区别在于需要在进行分配之前将之前已经拉取的消息提交offset值,这么做的原因主要是由于后续不会重平衡,要保证自动提交的情况下已经拉取的消息提交了offset,防止消息重复消费。以下是这段代码的实现逻辑:
KafkaConsumer#assign
public void assign(Collection<TopicPartition> partitions) {
acquireAndEnsureOpen();
try {
//异常处理
//...
fetcher.clearBufferedDataForUnassignedPartitions(partitions);
// 提交offset
if (coordinator != null)
this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
//分配topic和partition
if (this.subscriptions.assignFromUser(new HashSet<>(partitions)))
//更新元数据
metadata.requestUpdateForNewTopics();
} finally {
release();
}
}
//开启自动提交时,提交offset
public void maybeAutoCommitOffsetsAsync(long now) {
//开启自动提交
if (autoCommitEnabled) {
//更新提交时间
nextAutoCommitTimer.update(now);
//提交时间已经过期
if (nextAutoCommitTimer.isExpired()) {
//重置下一次提交时间为自动 提交的间隔时间
nextAutoCommitTimer.reset(autoCommitIntervalMs);
//执行异步提交offset
doAutoCommitOffsetsAsync();
}
}
}
//指定订阅分区
public synchronized boolean assignFromUser(Set<TopicPartition> partitions) {
//设置订阅模式
setSubscriptionType(SubscriptionType.USER_ASSIGNED);
//如果订阅的和当前一致
if (this.assignment.partitionSet().equals(partitions))
return false;
// 更新订阅的topic
//...查找逻辑
this.assignment.set(partitionToState);
//设置监听的topic并返回是否需要更新数据
return changeSubscription(manualSubscribedTopics);
}
取消订阅
在上面的逻辑中,如果发现需要订阅的topic为空的话,就是取消之前所有订阅的topic数据。取消订阅的函数中需要做几个操作:fetcher清空数据、subscriptions清空数据、coordinator做离开集群的操作。具体的实现如下:
KafkaConsumer#unsubscribe
public void unsubscribe() {
//加锁
acquireAndEnsureOpen();
try {
//fetcher置空
fetcher.clearBufferedDataForUnassignedPartitions(Collections.emptySet());
if (this.coordinator != null) {
//coordinator发送leave准备,这个是指coordinator要离开这个集群
this.coordinator.onLeavePrepare();
//coordinator发送leaveGroup
this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics");
}
//subscriptions 设置取消订阅的相关信息
this.subscriptions.unsubscribe();
} finally {
//是否锁
release();
}
}
//清空所有的数据
public synchronized void unsubscribe() {
this.subscription = Collections.emptySet();
this.groupSubscription = Collections.emptySet();
this.assignment.clear();
this.subscribedPattern = null;
this.subscriptionType = SubscriptionType.NONE;
this.assignmentId++;
}
coordinator的相关操作我们后续在介绍,协调者对于消费者来说是一个很核心的内容。
本文主要介绍了消费者监听消息的模式,主要有两种模式:订阅和分配,订阅模式支持topic级别和正则表达式级别的订阅。两种的核心区别是订阅分自平衡,分配模式需要指定对应的分区。当订阅的topic为空的时候,就是取消所有订阅的topic。
本文的内容就这么多,如果你觉得对你的学习和面试有些帮助,帮忙点个赞或者转发一下哈,谢谢。




