暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

kafka消费者--offset提交

我的IT技术路 2021-10-14
2613

消息中间件是每个做后台同学必须要掌握的一类框架,这主要取决于其广泛应用于互联网项目。消息中间件在这些系统中扮演着很重要的角色,它们的主要作用是消息异步,系统解耦,高并发削峰,分布式事务等等。目前主要的消息中间件有rabbitMQkafkarocketMQActiveMQ等,本系列文章总结的是kafka,也算是当前市面上比较流行的消息中间件,后续的文章会从kafka的生产者、消费者、broker等来总结。除了在实际应用中,消息中间件是一个常用的框架,在面试中,消息中间件也是必问内容。由于个人能力有限,文中难免有理解不到位的地方,还请留言指导,在此谢过。本系列文章kafka版本使用最新的2.8.0

 

Offset提交

在了解完之前的内容之后,应该说对消费者有了一个整体上的认识。之前我们解读了消费者的监听模式,心跳的发送,消息的拉取,消费者重平衡,本文会解析消费者在消费消息中一个最后一个环节:提交offset。在之前介绍kafka的时候有提到offset类似一个序列号,用来标识具体是哪条消息,Kafka通过offset来确定消费者已经处理完哪些消息了;另一个作用就是kafka的不同replica通过offset来确定同步情况,所以整个offset机制在kafka中使用非常广泛。

在消费者消费完消息之后,我们需要提交offset值,告诉服务端,这批消息已经处理,可以接着下一次处理。Kafka本身提供两种方式提交offset值,一种是自动提交,一种是手动提交从通信方式来看,kafka支持异步提交和同步提交,当然了kafka底层的通信都是异步的nio,同步是通过阻塞当前的调用等待结果返回实现的

 

手动同步提交

手动同步提交offset的方式是在kafka消费完消息之后进行处理的。调用接口在KafkaConsumer类中的commitSync方法。

 

    public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout) {
    //加锁
    acquireAndEnsureOpen();
    try {
    maybeThrowInvalidGroupIdException();
    offsets.forEach(this::updateLastSeenEpochIfNewer);
    //提交offset值
    if (!coordinator.commitOffsetsSync(new HashMap<>(offsets), time.timer(timeout))) {
    throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before successfully " +
    "committing offsets " + offsets);
    }
    } finally {
    //释放锁
    release();
    }
    }

    该方法的本质还是调用coordinator的commit方法,下面我们看下这个方法的具体实现逻辑。

     

      public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, Timer timer) {
      //异步提交完成offset回调callback
      invokeCompletedOffsetCommitCallbacks();
      //如果没有offset要提交直接返回
      if (offsets.isEmpty())
      return true;
      //发送主循环体
      do {
      //协调者未知或者连接不上直接返回失败
      if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
      return false;
      }
      //组装发送请求并唤醒客服端
      RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
      //发送并获取结果
      client.poll(future, timer);
      //如果poll后获取到之前异步提交的offset,可以直接回调callback
      invokeCompletedOffsetCommitCallbacks();
      //如果发送成功,返回成功结果
      if (future.succeeded()) {
      if (interceptors != null)
      interceptors.onCommit(offsets);
      return true;
      }
      //如果失败且不能重试的异常,直接抛出异常
      if (future.failed() && !future.isRetriable())
      throw future.exception();
      //如果可以重试的异常,就休眠一段时间进行重试
      timer.sleep(rebalanceConfig.retryBackoffMs);
      } while (timer.notExpired());
      //如果超时还没提交上去就返回失败
      return false;
      }

       

      上面的主循环体的逻辑比较简单:

      1) 发送提交请求并获取结果

      2) 将异步提交完成的offset进行回调

      3) 判断发送是否成功,成功就返回,不成功且可以重试的话,就进行重试。(其实这里是组装返回的结果

       

      在这里其实有一点是需要说明的,为什么刚开始进入的时候,开始回调invokeCompletedOffsetCommitCallbacks。这个主要是因为除了消费者线程之外,还有其他的线程也会获取到。进来就回调是为了先把已经完成的快速处理掉。那么在回调中完成的主要逻辑是:

        void invokeCompletedOffsetCommitCallbacks() {
        if (asyncCommitFenced.get()) {
        throw new FencedInstanceIdException("Get fenced exception for group.instance.id "
        + rebalanceConfig.groupInstanceId.orElse("unset_instance_id")
        + ", current member.id is " + memberId());
        }
        while (true) {
        //从队列里面处理已经完成提交的offset进行回调
        OffsetCommitCompletion completion = completedOffsetCommits.poll();
        //当队列为空时退出
        if (completion == null) {
        break;
        }
        completion.invoke();
        }
        }

        上面的逻辑是从completedOffsetCommits队列中获取,这是一个ConcurrentLinkedQueue队列,线程安全,因为除了消费者线程外,心跳线程也会处理该队列。这里是取出已经完成offset提交的消息,那是谁往里面添加的呢?

         

        Offset提交和处理

        completedOffsetCommits很显然是我们获取到提交的结果之后添加的,下面我们看下这段是怎么发送offset并且回调处理的。

          RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
          //offset 为空,直接返回
          if (offsets.isEmpty())
          return RequestFuture.voidSuccess();
          //获取协调者
          Node coordinator = checkAndGetCoordinator();
          if (coordinator == null)
          return RequestFuture.coordinatorNotAvailable();

          // 组装offset的数据格式
          Map<String, OffsetCommitRequestData.OffsetCommitRequestTopic> requestTopicDataMap = new HashMap<>();
          for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
          TopicPartition topicPartition = entry.getKey();
          OffsetAndMetadata offsetAndMetadata = entry.getValue();
          if (offsetAndMetadata.offset() < 0) {
          return RequestFuture.failure(new IllegalArgumentException("Invalid offset: " + offsetAndMetadata.offset()));
          }

          OffsetCommitRequestData.OffsetCommitRequestTopic topic = requestTopicDataMap
          .getOrDefault(topicPartition.topic(),
          new OffsetCommitRequestData.OffsetCommitRequestTopic()
          .setName(topicPartition.topic())
          );

          topic.partitions().add(new OffsetCommitRequestData.OffsetCommitRequestPartition()
          .setPartitionIndex(topicPartition.partition())
          .setCommittedOffset(offsetAndMetadata.offset())
          .setCommittedLeaderEpoch(offsetAndMetadata.leaderEpoch().orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
          .setCommittedMetadata(offsetAndMetadata.metadata())
          );
          requestTopicDataMap.put(topicPartition.topic(), topic);
          }
          //获取版本号
          final Generation generation;
          if (subscriptions.hasAutoAssignedPartitions()) {
          generation = generationIfStable();
          //...异常处理并返回
          } else {
          generation = Generation.NO_GENERATION;
          }
          //组装发送参数
          OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
          new OffsetCommitRequestData()
          .setGroupId(this.rebalanceConfig.groupId)
          .setGenerationId(generation.generationId)
          .setMemberId(generation.memberId)
          .setGroupInstanceId(rebalanceConfig.groupInstanceId.orElse(null))
          .setTopics(new ArrayList<>(requestTopicDataMap.values()))
          );
          //发送到unset队列等待客服端发送
          return client.send(coordinator, builder)
          .compose(new OffsetCommitResponseHandler(offsets, generation));
          }

           

          看了这么多kafka的消息发送,我们应该知道消息接受处理是一个类,在这里我们看下OffsetCommitResponseHandler的handle方法。

          OffsetCommitResponseHandler#handle:

            public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
            //未认证的topic列表
            Set<String> unauthorizedTopics = new HashSet<>();
            //对每一个结果进行处理,主要根据不同的异常进行不同的处理
            for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : commitResponse.data().topics()) {
            for (OffsetCommitResponseData.OffsetCommitResponsePartition partition : topic.partitions()) {
            TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex());
            OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);

            long offset = offsetAndMetadata.offset();

            Errors error = Errors.forCode(partition.errorCode());
            if (error == Errors.NONE) {
            log.debug("Committed offset {} for partition {}", offset, tp);
            } else {
            //异常处理
            //...
            }
            }
            }
            //抛出未认证异常
            if (!unauthorizedTopics.isEmpty()) {
            future.raise(new TopicAuthorizationException(unauthorizedTopics));
            } else {
            //返回成功
            future.complete(null);
            }
            }

            从上面的代码中发现并没有添加到callback队列,这是因为同步处理不需要callback,在异步处理的过程中,我们进行offset提交成功之后添加。

            异步提交具体的实现逻辑如下:

             

              private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
              //发送
              RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
              //创建callback
              final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
              //添加callback的监听器
              future.addListener(new RequestFutureListener<Void>() {
              @Override
              public void onSuccess(Void value) {
              if (interceptors != null)
              interceptors.onCommit(offsets);
              //发送成功,添加到callback中
              completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));
              }

              @Override
              public void onFailure(RuntimeException e) {
              Exception commitException = e;

              if (e instanceof RetriableException) {
              commitException = new RetriableCommitFailedException(e);
              }
              //发送失败,也添加到callback中
              completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException));
              if (commitException instanceof FencedInstanceIdException) {
              asyncCommitFenced.set(true);
              }
              }
              });
              }

               

              自动提交offset

              上面的流程说明了以下主动提交的方式,kafka处理手动提交之外,还有自动提交的方式,自动提交需要开启enable.auto.commit为true。自动提交是在消费者每次poll的时候,调用coordinator的poll方法最后会调用自动提交入口函数maybeAutoCommitOffsetsAsync。

                public void maybeAutoCommitOffsetsAsync(long now) {
                //是否开启自动提交
                if (autoCommitEnabled) {
                nextAutoCommitTimer.update(now);
                if (nextAutoCommitTimer.isExpired()) {
                nextAutoCommitTimer.reset(autoCommitIntervalMs);
                //执行自动提交
                doAutoCommitOffsetsAsync();
                }
                }
                }
                private void doAutoCommitOffsetsAsync() {
                Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed();
                log.debug("Sending asynchronous auto-commit of offsets {}", allConsumedOffsets);
                //异步提交offset
                commitOffsetsAsync(allConsumedOffsets, (offsets, exception) -> {
                if (exception != null) {
                if (exception instanceof RetriableCommitFailedException) {
                log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", offsets,
                exception);
                nextAutoCommitTimer.updateAndReset(rebalanceConfig.retryBackoffMs);
                } else {
                log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, exception.getMessage());
                }
                } else {
                log.debug("Completed asynchronous auto-commit of offsets {}", offsets);
                }
                });
                }

                本文主要介绍了消费者源码中最后一个环节--offset值提交,offset本身是kafka设计中一个非常重要的内容,涉及到几个重要的知识点,消息不丢失,主从复制,事务等。Offset类似数据库中的主键一样,唯一标示某个分区的位置,所以offset的理解也相对重要。不过,对于消费者而言,其只需要知道在消费完消息之后提交offset值就行,其他的是kakfa集群处理的逻辑。在消费者这里,我们重点讲述的就是offset的提交逻辑。

                本文的内容就这么多,如果你觉得对你的学习和面试有些帮助,帮忙点个赞或者转发一下哈,谢谢。

                文章转载自我的IT技术路,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                评论