暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

一文搞懂kafka的consumer流程

阿斌Java之路 2022-07-04
2625

消费模式

常见的消费模式有两种:

poll(拉)
:消费者主动向服务端拉取消息。

push(推)
:服务端主动推送消息给消费者。

由于推模式很难考虑到每个客户端不同的消费速率,因此kafka采用的是poll的模式,该模式有个缺点,如果服务端没有消息,消费端就会一直空轮询。为了避免过多不必要的空轮询,kafka做了改进,如果没消息服务端就会暂时保持该请求,在一段时间内有消息再回应给客户端。

消费者组原理

Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。

  • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。
  • 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
img

对千消息中间件而言,一般有两种消息投递模式:点对点(P2P, Point-to-Point)模式和发布/订阅(Pub/Sub)模式。点对点模式是基于队列的,消息生产者发送消息到队列,消息消费 者从队列中接收消息。发布订阅模式定义了如何向一个内容节点发布和订阅消息,这个内容节 点称为主题(Topic) , 主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题, 而消息订阅者从主题中订阅消息。主题使得消息的订阅者和发布者互相保持独立,不需要进行 接触即可保证消息的传递,发布/订阅模式在消息的一对多广播时采用。Kaflca同时支待两种消 息投递模式,而这正是得益千消费者与消费组模型的契合:

  • 如果所有的消费者都隶属于同一个消费组,那么所有的消息都会被均衡地投递给每一 个消费者,即每条消息只会被一个消费者处理,这就相当千点对点模式的应用。
  • 如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费 者,即每条消息会被所有的消费者处理,这就相当于发布/订阅模式的应用。

具体的消费者组初始化流程:

img

分区分配策略

我们知道一个 Consumer Group 中有多个 Consumer,一个 Topic 也有多个 Partition,所以必然会涉及到 Partition 的分配问题: 确定哪个 Partition 由哪个 Consumer 来消费的问题。

Kafka 客户端提供了3 种分区分配策略:RangeAssignorRoundRobinAssignorStickyAssignor,前两种 分配方案相对简单一些StickyAssignor分配方案相对复杂一些。

Range:

img

RoundRobin:

img

Sticky:

StickyAssignor 分区分配算法是 Kafka Java 客户端提供的分配策略中最复杂的一种,可以通过 partition.assignment.strategy 参数去设置,从 0.11 版本开始引入,目的就是在执行新分配时,尽量在上一次分配结果上少做调整,其主要实现了以下2个目标:

1)、Topic Partition 的分配要尽量均衡。

2)、当 Rebalance(重分配,后面会详细分析) 发生时,尽量与上一次分配结果保持一致。

该算法的精髓在于,重分配后,还能尽量与上一次结果保持一致,进而达到消费者故障下线,故障恢复后的均衡问题,在此就不举例了。

位移提交

消费者提交的offset值维护在**__consumer_offsets这个Topic中,具体维护在哪个分区中,是由消费者所在的消费者组groupid**决定,计算方式是:groupid的hashCode值对50取余。当kafka环境正常而消费者不能消费时,有可能是对应的__consumer_offsets分区leader为none或-1,或者分区中的日志文件损坏导致。

__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+ 分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行 compact,也就是每个 group.id+topic+分区号就保留最新数据。

一般情况下, 当集群中第一次有消费者消费消息时会自动创建主题_ consumer_ offsets, 不过它的副本因子还受offsets.topic .replication.factor参数的约束,这个参数的默认值为3 (下载安装的包中此值可能为1),分区数可以通过offsets.topic.num.partitions参数设置,默认为50。

消费者提交offset的方式有两种,自动提交手动提交

自动提交

为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。

  • enable.auto.commit
    :是否开启自动提交offset功能,默认是true
  • auto.commit.interval.ms
    :自动提交offset的时间间隔,默认是5s

自动提交有可能出现消息消费失败,但是却提交了offset的情况,导致消息丢失。为了能够实现消息消费offset的精确控制,更推荐手动提交。

手动提交

虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因 此Kafka还提供了手动提交offset的API。手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相 同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,同步提交阻塞当前线程,一直到提交成 功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故 有可能提交失败

  • commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
  • commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。

指定消费位置

在kafka中当消费者查找不到所记录的消费位移时,会根据auto.offset.reset的配置,决定从何处消费。

auto.offset.reset = earliest | latest | none
默认是 latest。

  • earliest
    :自动将偏移量重置为最早的偏移量,--from-beginning。
  • latest
    (默认值):自动将偏移量重置为最新偏移量
  • none
    :如果未找到消费者组的先前偏移量,则向消费者抛出异常。

Kafka中的消费位移是存储在一个内部主题中的, 而我们可以使用**seek()**方法可 以突破这一限制:消费位移可以保存在任意的存储介质中, 例如数据库、 文件系统等。以数据 库为例, 我们将消费位移保存在其中的一个表中, 在下次消费的时候可以读取存储在数据表中 的消费位移并通过seek()方法指向这个具体的位置 。

指定位移消费

// 指定位置进行消费
Set<TopicPartition> assignment = kafkaConsumer.assignment();

// 保证分区分配方案已经制定完毕
while (assignment.size() == 0){
kafkaConsumer.poll(Duration.ofSeconds(1));

assignment = kafkaConsumer.assignment();
}

// 指定消费的offset
for (TopicPartition topicPartition : assignment) {
kafkaConsumer.seek(topicPartition,600);
}

// 3 消费数据
while (true){

ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {

System.out.println(consumerRecord);
}
}

指定时间消费

原理就是查到时间对应的offset再去指定位移消费,为了确保同步到分区信息,我们还需要确保能获取到分区,再去查询分区时间

// 指定位置进行消费
Set<TopicPartition> assignment = kafkaConsumer.assignment();

// 保证分区分配方案已经制定完毕
while (assignment.size() == 0){
kafkaConsumer.poll(Duration.ofSeconds(1));

assignment = kafkaConsumer.assignment();
}

// 希望把时间转换为对应的offset
HashMap<TopicPartition, Long> topicPartitionLongHashMap = new HashMap<>();

// 封装对应集合
for (TopicPartition topicPartition : assignment) {
topicPartitionLongHashMap.put(topicPartition,System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
}

Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(topicPartitionLongHashMap);

// 指定消费的offset
for (TopicPartition topicPartition : assignment) {

OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);

kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset());
}

// 3 消费数据
while (true){

ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {

System.out.println(consumerRecord);
}
}
}

拦截器

与生产者对应,消费者也有拦截器。我们来看看拦截器具体的方法。

public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {

ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);

void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);

void close();
}

KaflcaConsumer会在poll()方法返回之前调用拦截器的onConsume()方法来对消息进行相应 的定制化操作,比如修改返回的消息内容按照某种规则过滤消息(可能会减少poll()方法返回 的消息的个数)。如果onConsume()方法中抛出异常, 那么会被捕获并记录到日志中, 但是异 常不会再向上传递。

KaflcaConsumer会在提交完消费位移之后调用拦截器的**onCommit()**方法, 可以使用这个方 法来记录跟踪所提交的位移信息,比如当消费者使用cornmitSync的无参方法时,我们不知道提 交的消费位移的具体细节, 而使用拦截器的onCommit()方法却可以做到这 一点。




END



后台回复关键词 kafka 获取今日推荐资料

微信8.0新增了一万的好友数,之前没加上好友的可以加一下我的个人微信,再晚又满了,一起抱团取暖结伴内卷。



扫码拉群,学习打卡,交流经验


每周一读




文章转载自阿斌Java之路,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论