消息中间件是每个做后台同学必须要掌握的一类框架,这主要取决于其广泛应用于互联网项目。消息中间件在这些系统中扮演着很重要的角色,它们的主要作用是消息异步,系统解耦,高并发削峰,分布式事务等等。目前主要的消息中间件有rabbitMQ、kafka、rocketMQ、ActiveMQ等,本系列文章总结的是kafka,也算是当前市面上比较流行的消息中间件,后续的文章会从kafka的生产者、消费者、broker等来总结。除了在实际应用中,消息中间件是一个常用的框架,在面试中,消息中间件也是必问内容。由于个人能力有限,文中难免有理解不到位的地方,还请留言指导,在此谢过。本系列文章kafka版本使用最新的2.8.0。
Offset提交
在了解完之前的内容之后,应该说对消费者有了一个整体上的认识。之前我们解读了消费者的监听模式,心跳的发送,消息的拉取,消费者重平衡,本文会解析消费者在消费消息中一个最后一个环节:提交offset。在之前介绍kafka的时候有提到offset类似一个序列号,用来标识具体是哪条消息,Kafka通过offset来确定消费者已经处理完哪些消息了;另一个作用就是kafka的不同replica通过offset来确定同步情况,所以整个offset机制在kafka中使用非常广泛。
在消费者消费完消息之后,我们需要提交offset值,告诉服务端,这批消息已经处理,可以接着下一次处理。Kafka本身提供两种方式提交offset值,一种是自动提交,一种是手动提交。从通信方式来看,kafka支持异步提交和同步提交,当然了kafka底层的通信都是异步的nio,同步是通过阻塞当前的调用等待结果返回实现的。
手动同步提交
手动同步提交offset的方式是在kafka消费完消息之后进行处理的。调用接口在KafkaConsumer类中的commitSync方法。
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout) {//加锁acquireAndEnsureOpen();try {maybeThrowInvalidGroupIdException();offsets.forEach(this::updateLastSeenEpochIfNewer);//提交offset值if (!coordinator.commitOffsetsSync(new HashMap<>(offsets), time.timer(timeout))) {throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before successfully " +"committing offsets " + offsets);}} finally {//释放锁release();}}
该方法的本质还是调用coordinator的commit方法,下面我们看下这个方法的具体实现逻辑。
public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, Timer timer) {//异步提交完成offset回调callbackinvokeCompletedOffsetCommitCallbacks();//如果没有offset要提交直接返回if (offsets.isEmpty())return true;//发送主循环体do {//协调者未知或者连接不上直接返回失败if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {return false;}//组装发送请求并唤醒客服端RequestFuture<Void> future = sendOffsetCommitRequest(offsets);//发送并获取结果client.poll(future, timer);//如果poll后获取到之前异步提交的offset,可以直接回调callbackinvokeCompletedOffsetCommitCallbacks();//如果发送成功,返回成功结果if (future.succeeded()) {if (interceptors != null)interceptors.onCommit(offsets);return true;}//如果失败且不能重试的异常,直接抛出异常if (future.failed() && !future.isRetriable())throw future.exception();//如果可以重试的异常,就休眠一段时间进行重试timer.sleep(rebalanceConfig.retryBackoffMs);} while (timer.notExpired());//如果超时还没提交上去就返回失败return false;}
上面的主循环体的逻辑比较简单:
1) 发送提交请求并获取结果
2) 将异步提交完成的offset进行回调
3) 判断发送是否成功,成功就返回,不成功且可以重试的话,就进行重试。(其实这里是组装返回的结果)
在这里其实有一点是需要说明的,为什么刚开始进入的时候,开始回调invokeCompletedOffsetCommitCallbacks。这个主要是因为除了消费者线程之外,还有其他的线程也会获取到。进来就回调是为了先把已经完成的快速处理掉。那么在回调中完成的主要逻辑是:
void invokeCompletedOffsetCommitCallbacks() {if (asyncCommitFenced.get()) {throw new FencedInstanceIdException("Get fenced exception for group.instance.id "+ rebalanceConfig.groupInstanceId.orElse("unset_instance_id")+ ", current member.id is " + memberId());}while (true) {//从队列里面处理已经完成提交的offset进行回调OffsetCommitCompletion completion = completedOffsetCommits.poll();//当队列为空时退出if (completion == null) {break;}completion.invoke();}}
上面的逻辑是从completedOffsetCommits队列中获取,这是一个ConcurrentLinkedQueue队列,线程安全,因为除了消费者线程外,心跳线程也会处理该队列。这里是取出已经完成offset提交的消息,那是谁往里面添加的呢?
Offset提交和处理
completedOffsetCommits很显然是我们获取到提交的结果之后添加的,下面我们看下这段是怎么发送offset并且回调处理的。
RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {//offset 为空,直接返回if (offsets.isEmpty())return RequestFuture.voidSuccess();//获取协调者Node coordinator = checkAndGetCoordinator();if (coordinator == null)return RequestFuture.coordinatorNotAvailable();// 组装offset的数据格式Map<String, OffsetCommitRequestData.OffsetCommitRequestTopic> requestTopicDataMap = new HashMap<>();for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {TopicPartition topicPartition = entry.getKey();OffsetAndMetadata offsetAndMetadata = entry.getValue();if (offsetAndMetadata.offset() < 0) {return RequestFuture.failure(new IllegalArgumentException("Invalid offset: " + offsetAndMetadata.offset()));}OffsetCommitRequestData.OffsetCommitRequestTopic topic = requestTopicDataMap.getOrDefault(topicPartition.topic(),new OffsetCommitRequestData.OffsetCommitRequestTopic().setName(topicPartition.topic()));topic.partitions().add(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(topicPartition.partition()).setCommittedOffset(offsetAndMetadata.offset()).setCommittedLeaderEpoch(offsetAndMetadata.leaderEpoch().orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH)).setCommittedMetadata(offsetAndMetadata.metadata()));requestTopicDataMap.put(topicPartition.topic(), topic);}//获取版本号final Generation generation;if (subscriptions.hasAutoAssignedPartitions()) {generation = generationIfStable();//...异常处理并返回} else {generation = Generation.NO_GENERATION;}//组装发送参数OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(new OffsetCommitRequestData().setGroupId(this.rebalanceConfig.groupId).setGenerationId(generation.generationId).setMemberId(generation.memberId).setGroupInstanceId(rebalanceConfig.groupInstanceId.orElse(null)).setTopics(new ArrayList<>(requestTopicDataMap.values())));//发送到unset队列等待客服端发送return client.send(coordinator, builder).compose(new OffsetCommitResponseHandler(offsets, generation));}
看了这么多kafka的消息发送,我们应该知道消息接受处理是一个类,在这里我们看下OffsetCommitResponseHandler的handle方法。
OffsetCommitResponseHandler#handle:
public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {//未认证的topic列表Set<String> unauthorizedTopics = new HashSet<>();//对每一个结果进行处理,主要根据不同的异常进行不同的处理for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : commitResponse.data().topics()) {for (OffsetCommitResponseData.OffsetCommitResponsePartition partition : topic.partitions()) {TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex());OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);long offset = offsetAndMetadata.offset();Errors error = Errors.forCode(partition.errorCode());if (error == Errors.NONE) {log.debug("Committed offset {} for partition {}", offset, tp);} else {//异常处理//...}}}//抛出未认证异常if (!unauthorizedTopics.isEmpty()) {future.raise(new TopicAuthorizationException(unauthorizedTopics));} else {//返回成功future.complete(null);}}
从上面的代码中发现并没有添加到callback队列,这是因为同步处理不需要callback,在异步处理的过程中,我们进行offset提交成功之后添加。
异步提交具体的实现逻辑如下:
private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {//发送RequestFuture<Void> future = sendOffsetCommitRequest(offsets);//创建callbackfinal OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;//添加callback的监听器future.addListener(new RequestFutureListener<Void>() {@Overridepublic void onSuccess(Void value) {if (interceptors != null)interceptors.onCommit(offsets);//发送成功,添加到callback中completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));}@Overridepublic void onFailure(RuntimeException e) {Exception commitException = e;if (e instanceof RetriableException) {commitException = new RetriableCommitFailedException(e);}//发送失败,也添加到callback中completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException));if (commitException instanceof FencedInstanceIdException) {asyncCommitFenced.set(true);}}});}
自动提交offset
上面的流程说明了以下主动提交的方式,kafka处理手动提交之外,还有自动提交的方式,自动提交需要开启enable.auto.commit为true。自动提交是在消费者每次poll的时候,调用coordinator的poll方法最后会调用自动提交入口函数maybeAutoCommitOffsetsAsync。
public void maybeAutoCommitOffsetsAsync(long now) {//是否开启自动提交if (autoCommitEnabled) {nextAutoCommitTimer.update(now);if (nextAutoCommitTimer.isExpired()) {nextAutoCommitTimer.reset(autoCommitIntervalMs);//执行自动提交doAutoCommitOffsetsAsync();}}}private void doAutoCommitOffsetsAsync() {Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed();log.debug("Sending asynchronous auto-commit of offsets {}", allConsumedOffsets);//异步提交offsetcommitOffsetsAsync(allConsumedOffsets, (offsets, exception) -> {if (exception != null) {if (exception instanceof RetriableCommitFailedException) {log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", offsets,exception);nextAutoCommitTimer.updateAndReset(rebalanceConfig.retryBackoffMs);} else {log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, exception.getMessage());}} else {log.debug("Completed asynchronous auto-commit of offsets {}", offsets);}});}
本文主要介绍了消费者源码中最后一个环节--offset值提交,offset本身是kafka设计中一个非常重要的内容,涉及到几个重要的知识点,消息不丢失,主从复制,事务等。Offset类似数据库中的主键一样,唯一标示某个分区的位置,所以offset的理解也相对重要。不过,对于消费者而言,其只需要知道在消费完消息之后提交offset值就行,其他的是kakfa集群处理的逻辑。在消费者这里,我们重点讲述的就是offset的提交逻辑。
本文的内容就这么多,如果你觉得对你的学习和面试有些帮助,帮忙点个赞或者转发一下哈,谢谢。




