暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

kafka--消费者消费模式(订阅/分配)

我的IT技术路 2021-09-16
3192

消息中间件是每个做后台同学必须要掌握的一类框架,这主要取决于其广泛应用于互联网项目。消息中间件在这些系统中扮演着很重要的角色,它们的主要作用是消息异步,系统解耦,高并发削峰,分布式事务等等。目前主要的消息中间件有rabbitMQkafkarocketMQActiveMQ等,本系列文章总结的是kafka,也算是当前市面上比较流行的消息中间件,后续的文章会从kafka的生产者、消费者、broker等来总结。除了在实际应用中,消息中间件是一个常用的框架,在面试中,消息中间件也是必问内容。由于个人能力有限,文中难免有理解不到位的地方,还请留言指导,在此谢过。本系列文章kafka版本使用最新的2.8.0

 

消费者消费模式

 

消费者在启动的过程,需要去监听topic的消息,kafka提供了两种模式:订阅(subscribe)和分配(assign)。这两种模式的主要差别体现在是否需要协调者动态分配分区。订阅模式是消费者只指定订阅的topic,由协调者统一分配分区,这么做有两个好处:

1)消费者消费partition的数量相对均衡,保证每个分区的消费速度大致一样;

2)消费者如果出现异常,心跳断连,那么会有新的消费者来消费该分区;

第二点对于一个高可用的系统来说至关重要,我们不能因为某一台机器出现问题导致整个集群出现问题。assign模式是指定对应的消费分区,该分区只能由该消费者来消费,这种做法在某些特定的场景下是比较有作用的。那是不是意味着我们不需要assign模式?那倒也不是,flink不就使用的是assign模式么。只能说在绝大部分的场景下我们都使用订阅模式,非要使用分配模式,请慎重。

 

消费者订阅模式(subscribe)

上面已经介绍了订阅模式和分配模式的区别,现在我们来重点看下其核心的实现逻辑。

KafkaConsumer#subscribe:

public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {

//加锁

        acquireAndEnsureOpen();

        try {

//前面各种异常检查,包含groupId,topics,协调者,值得注意的是如果topics为空的话,就当做调用unsubscribe来处理

//...

//清空本地缓存之前的topic分配的数据

            fetcher.clearBufferedDataForUnassignedTopics(topics);

//如果订阅成功

            if (this.subscriptions.subscribe(new HashSet<>(topics), listener))

//更新元数据

                metadata.requestUpdateForNewTopics();

//...

        } finally {

//释放锁

            release();

        }

    }

上面的逻辑比较简单,也就几个步骤:

1) 加锁

2) 分配前check,这里会check多项,包括topic,groupId,协调器;之前我们提到了在需要进行reblance的时候,都是协调者进行分配的,所以这里要check一下。如果topic为空的话,这里会进行反订阅操作,会把之前所有订阅的topic清空

3) 进行订阅topic

4) 订阅成功,更新元数据

5) 释放锁

 

加锁和释放锁的过程

上面的加锁和释放锁的过程和我们平时看到的加锁不一样,它并非使用的是lock,而是使用一种无锁操作的原子更新器来实现。

KafkaConsumer#acquire和KafkaConsumer#release:

//设置的初始值为无线程

private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);

//设置为原子整型

private final AtomicInteger refcount = new AtomicInteger(0);

//加锁

private void acquire() {

        long threadId = Thread.currentThread().getId();

//如果当前线程id和执行的线程是一个;设置无锁为当前线程

        if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))

            throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");

//次数加1

        refcount.incrementAndGet();

    }

//释放锁

private void release() {

//如果为0的话,就设置为无锁

        if (refcount.decrementAndGet() == 0)

            currentThread.set(NO_CURRENT_THREAD);

    }

 

整个加锁的过程是:

1) 获取当前的线程id

2) 判断是否加锁成功,判断有两种情况,第一种:如果获取的执行线程和之前存入的线程id是一样的,加锁成功,这种情况是一种可重复锁的思维。如果加锁成功,加锁计数加1。第二种:如果设置初始值为当前线程,加锁成功,这种是对于没有线程加锁的情况,先cas设置加锁。

3) 前面的加锁成功之后,加锁计数加1。

上面的实现本身是一个轻量级的可重入锁的实现过程,这就要求每个消费者最好使用一个单独的线程执行操作。如果在使用多消费者的情况下,请确保每个消费者是一个单独的线程在执行。释放锁的过程简单,看下代码即可。有时候我们在开发的过程中,要确保某件事只有一个线程进行执行的话,上面是一个很不错的思路,无需加锁,非常轻量级

 

本地缓存数据清理

清空本地缓存的topic主要是将之前还在订阅的,但是新监听的topic不在订阅的数据进行清理,清理的数据有两部分:completedFetches和nextInLineFetchcompletedFetches是从服务器拉取下来的消息,nextInLineFetch是准备给消费者的消息,这两个都是CompletedFetch类型,中间需要有层转换操作(这个在后续拉取消息的流程中进行详细说明)。具体的实现逻辑如下:

Fetcher#clearBufferedDataForUnassignedTopics

public void clearBufferedDataForUnassignedTopics(Collection<String> assignedTopics) {

//找出待分配的topic中目前已经订阅了的topic

        Set<TopicPartition> currentTopicPartitions = new HashSet<>();

        for (TopicPartition tp : subscriptions.assignedPartitions()) {

            if (assignedTopics.contains(tp.topic())) {

                currentTopicPartitions.add(tp);

            }

        }

//将接下来还要继续订阅的topic数据保留,其他的清理

        clearBufferedDataForUnassignedPartitions(currentTopicPartitions);

    }

public void clearBufferedDataForUnassignedPartitions(Collection<TopicPartition> assignedPartitions) {

//获取已经拉取的消息,completedFetches 保存的是从服务器拉下来的消息

        Iterator<CompletedFetch> completedFetchesItr = completedFetches.iterator();

        while (completedFetchesItr.hasNext()) {

            CompletedFetch records = completedFetchesItr.next();

            TopicPartition tp = records.partition;

//如果不包含在订阅的topic中,消息清理

            if (!assignedPartitions.contains(tp)) {

                records.drain();

                completedFetchesItr.remove();

            }

        }

//nextInLineFetch 是给消费者获取的消息,当nextInLineFetch还存在其他topic的,进行清理

        if (nextInLineFetch != null && !assignedPartitions.contains(nextInLineFetch.partition)) {

            nextInLineFetch.drain();

            nextInLineFetch = null;

        }

    }

订阅topic

订阅topic主要实现的逻辑是:注册监听器,设置监听模式,修改订阅topic数据,如果topic没有发生改变,那么就不更新topic的相关数据,如果发生改变,置needPartialUpdate标志位为true,等待下一轮的更新。具体的实现逻辑如下:

#SubscriptionState.subscribe

//监听topic,返回是否发生改变

public synchronized boolean subscribe(Set<String> topics, ConsumerRebalanceListener listener) {

        registerRebalanceListener(listener);

        setSubscriptionType(SubscriptionType.AUTO_TOPICS);

        return changeSubscription(topics);

    }

//进行对比

private boolean changeSubscription(Set<String> topicsToSubscribe) {

        if (subscription.equals(topicsToSubscribe))

            return false;

 

        subscription = topicsToSubscribe;

        return true;

    }

//下次请求设置为立即更新

public synchronized int requestUpdateForNewTopics() {

        // Override the timestamp of last refresh to let immediate update.

        this.lastRefreshMs = 0;

        this.needPartialUpdate = true;

        this.requestVersion++;

        return this.updateVersion;

    }

除了上面使用传入的topic,kafka还支持正则匹配的传入topic,监听的逻辑和上面类似,有兴趣的同学可以去看下。

 

分配模式(assign)

和上面的订阅模式不同,分配模式需要传入的是topic和partition,指定消费者消费哪个分区,之后也不会在发生重平衡等操作。分配模式的实现逻辑和订阅模式都部分类似,主要的区别在于需要在进行分配之前将之前已经拉取的消息提交offset值,这么做的原因主要是由于后续不会重平衡,要保证自动提交的情况下已经拉取的消息提交了offset,防止消息重复消费。以下是这段代码的实现逻辑:

KafkaConsumer#assign

public void assign(Collection<TopicPartition> partitions) {

        acquireAndEnsureOpen();

        try {

//异常处理

           //...

   fetcher.clearBufferedDataForUnassignedPartitions(partitions);

   // 提交offset

   if (coordinator != null)

   this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());

//分配topic和partition

if (this.subscriptions.assignFromUser(new HashSet<>(partitions)))

//更新元数据

metadata.requestUpdateForNewTopics();

        } finally {

            release();

        }

    }

//开启自动提交时,提交offset

public void maybeAutoCommitOffsetsAsync(long now) {

//开启自动提交

        if (autoCommitEnabled) {

//更新提交时间

            nextAutoCommitTimer.update(now);

//提交时间已经过期

            if (nextAutoCommitTimer.isExpired()) {

//重置下一次提交时间为自动 提交的间隔时间

                nextAutoCommitTimer.reset(autoCommitIntervalMs);

//执行异步提交offset

                doAutoCommitOffsetsAsync();

            }

        }

    }

//指定订阅分区

 public synchronized boolean assignFromUser(Set<TopicPartition> partitions) {

 //设置订阅模式

        setSubscriptionType(SubscriptionType.USER_ASSIGNED);

//如果订阅的和当前一致

        if (this.assignment.partitionSet().equals(partitions))

            return false;

        // 更新订阅的topic

//...查找逻辑

        this.assignment.set(partitionToState);

//设置监听的topic并返回是否需要更新数据

        return changeSubscription(manualSubscribedTopics);

    }

 

取消订阅

在上面的逻辑中,如果发现需要订阅的topic为空的话,就是取消之前所有订阅的topic数据。取消订阅的函数中需要做几个操作:fetcher清空数据、subscriptions清空数据、coordinator做离开集群的操作。具体的实现如下:

KafkaConsumer#unsubscribe

public void unsubscribe() {

//加锁

        acquireAndEnsureOpen();

        try {

//fetcher置空

          fetcher.clearBufferedDataForUnassignedPartitions(Collections.emptySet());

            if (this.coordinator != null) {

//coordinator发送leave准备,这个是指coordinator要离开这个集群

                this.coordinator.onLeavePrepare();

//coordinator发送leaveGroup

                this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics");

            }

//subscriptions 设置取消订阅的相关信息

            this.subscriptions.unsubscribe();

        } finally {

//是否锁

            release();

        }

    }

//清空所有的数据

public synchronized void unsubscribe() {

        this.subscription = Collections.emptySet();

        this.groupSubscription = Collections.emptySet();

        this.assignment.clear();

        this.subscribedPattern = null;

        this.subscriptionType = SubscriptionType.NONE;

        this.assignmentId++;

    }

coordinator的相关操作我们后续在介绍,协调者对于消费者来说是一个很核心的内容。

本文主要介绍了消费者监听消息的模式,主要有两种模式:订阅和分配,订阅模式支持topic级别和正则表达式级别的订阅。两种的核心区别是订阅分自平衡,分配模式需要指定对应的分区。当订阅的topic为空的时候,就是取消所有订阅的topic。

 

本文的内容就这么多,如果你觉得对你的学习和面试有些帮助,帮忙点个赞或者转发一下哈,谢谢。


文章转载自我的IT技术路,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论