暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Kafka consumer_offsets 主题深度剖析

大数据技能圈 2025-02-24
57
在 Apache Kafka 的消息消费机制中,确保消息被可靠消费是一个核心问题。为了解决这个问题,Kafka 设计了一个特殊的内部主题 consumer_offsets,用于跟踪和管理消费者组的消费进度。本文将深入剖析这个特殊主题的工作原理和实现机制。

01

consumer_offsets 的基本概念

consumer_offsets 是 Kafka 的一个内部主题,它具有以下特征:
1. 默认包含 50 个分区(可通过 offsets.topic.num.partitions 配置)
2. 使用 3 个副本因子(可通过 offsets.topic.replication.factor 配置)
3. 采用日志压缩(log compaction)的清理策略
4. 消息格式为二进制的键值对
这个主题存储了所有消费者组的位移信息。每个消费者组消费某个主题分区时,都会定期将自己的消费位置(offset)提交到这个主题中。当消费者重启或发生再平衡时,可以从这个主题中恢复之前的消费位置,确保消息不会丢失或重复消费。
让我们通过代码来演示如何实现消费者位移的提交和管理:
    public class ConsumerOffsetDemo {
    private final KafkaConsumer<String, String> consumer;
    private final String topic;
    private final String groupId;

    public ConsumerOffsetDemo(String bootstrapServers, String topic, String groupId) {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
    StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
    StringDeserializer.class.getName());
    // 关闭自动提交,手动控制位移提交
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

    this.consumer = new KafkaConsumer<>(props);
    this.topic = topic;
    this.groupId = groupId;
    }

    public void consumeAndCommit() {
    try {
    consumer.subscribe(Collections.singletonList(topic));

    while (true) {
    ConsumerRecords<String, String> records =
    consumer.poll(Duration.ofMillis(100));

    for (ConsumerRecord<String, String> record : records) {
    // 处理消息
    processRecord(record);

    // 手动提交单条消息的位移
    Map<TopicPartition, OffsetAndMetadata> offsets =
    Collections.singletonMap(
    new TopicPartition(record.topic(), record.partition()),
    new OffsetAndMetadata(record.offset() + 1)
    );
    consumer.commitSync(offsets);
    }
    }
    } finally {
    consumer.close();
    }
    }
    }

    02

    位移提交机制

    位移提交是 consumer_offsets 主题的核心功能。当消费者消费消息时,需要定期将自己的消费进度提交到这个主题。提交的消息包含以下信息:
    1. key:包含 <消费者组ID, 主题名称, 分区号> 的三元组
    2. value:包含 offset(位移)、timestamp(时间戳)等信息
    提交方式分为自动提交和手动提交:
    1. 自动提交:由消费者自动定期提交,通过 auto.commit.interval.ms 配置提交间隔
    2. 手动提交:由应用程序控制提交时机,可以选择同步提交或异步提交
    下面是一个完整的位移监控实现:
      public class OffsetMonitor {
      private final AdminClient adminClient;
      private final KafkaConsumer<byte[], byte[]> consumer;

      public OffsetMonitor(String bootstrapServers) {
      Properties props = new Properties();
      props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
      this.adminClient = AdminClient.create(props);

      props.put(ConsumerConfig.GROUP_ID_CONFIG, "offset-monitor");
      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
      ByteArrayDeserializer.class.getName());
      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
      ByteArrayDeserializer.class.getName());
      this.consumer = new KafkaConsumer<>(props);
      }

      public Map<String, ConsumerGroupOffset> getConsumerGroupOffsets(String groupId) {
      Map<String, ConsumerGroupOffset> result = new HashMap<>();

      try {
      // 获取消费者组的位移信息
      ListConsumerGroupOffsetsResult offsetsResult =
      adminClient.listConsumerGroupOffsets(groupId);
      Map<TopicPartition, OffsetAndMetadata> offsets =
      offsetsResult.partitionsToOffsetAndMetadata().get();

      // 获取主题的结束位移
      Map<TopicPartition, Long> endOffsets =
      consumer.endOffsets(offsets.keySet());

      // 计算消费延迟
      for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
      TopicPartition tp = entry.getKey();
      long committedOffset = entry.getValue().offset();
      long endOffset = endOffsets.get(tp);
      long lag = endOffset - committedOffset;

      result.put(tp.topic(), new ConsumerGroupOffset(
      committedOffset, endOffset, lag));
      }
      } catch (Exception e) {
      e.printStackTrace();
      }

      return result;
      }
      }

      03

      位移管理和运维

      在实际运维中,我们需要对 consumer_offsets 主题进行管理和监控。主要包括以下几个方面:
      1. 位移重置:当需要重新消费某个主题的消息时,可以重置消费者组的位移
      2. 消费者组管理:包括删除不再使用的消费者组等操作
      3. 监控告警:监控消费延迟,及时发现消费异常
      下面是一个位移管理工具的实现:
        public class OffsetManager {
        private final AdminClient adminClient;

        public OffsetManager(String bootstrapServers) {
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        this.adminClient = AdminClient.create(props);
        }

        // 重置消费者组位移
        public void resetOffset(String groupId, String topic, int partition, long offset) {
        try {
        TopicPartition tp = new TopicPartition(topic, partition);
        Map<TopicPartition, OffsetAndMetadata> offsetMap =
        Collections.singletonMap(tp, new OffsetAndMetadata(offset));

        adminClient.alterConsumerGroupOffsets(groupId, offsetMap).all().get();

        System.out.printf("Successfully reset offset for group=%s, topic=%s, " +
        "partition=%d to %d%n",
        groupId, topic, partition, offset);
        } catch (Exception e) {
        e.printStackTrace();
        }
        }

        // 删除消费者组
        public void deleteConsumerGroup(String groupId) {
        try {
        adminClient.deleteConsumerGroups(Collections.singleton(groupId)).all().get();
        System.out.printf("Successfully deleted consumer group: %s%n", groupId);
        } catch (Exception e) {
        e.printStackTrace();
        }
        }

        // 监控消费延迟
        public void monitorConsumerLag(String groupId, String topic) {
        try {
        TopicPartition tp = new TopicPartition(topic, 0);
        Map<TopicPartition, OffsetAndMetadata> offsetMap =
        adminClient.listConsumerGroupOffsets(groupId)
        .partitionsToOffsetAndMetadata().get();

        long currentOffset = offsetMap.get(tp).offset();
        long endOffset = getEndOffset(tp);
        long lag = endOffset - currentOffset;

        if (lag > 10000) { // 设置告警阈值
        System.out.printf("Warning: High lag detected for group=%s, topic=%s: %d%n",
        groupId, topic, lag);
        }
        } catch (Exception e) {
        e.printStackTrace();
        }
        }

        private long getEndOffset(TopicPartition tp) {
        try (KafkaConsumer<?, ?> consumer = new KafkaConsumer<>(new Properties())) {
        Map<TopicPartition, Long> endOffsets = consumer.endOffsets(
        Collections.singleton(tp));
        return endOffsets.get(tp);
        }
        }
        }
        consumer_offsets 主题是 Kafka 消息消费机制的核心组件,它通过存储和管理消费位移信息,确保了消息消费的可靠性和可恢复性。

        04

        加群请添加作者

        05

        获取文档资料

        欢迎关注我的公众号【大数据技能圈】,获取更多大数据技术干货!

        推荐阅读系列文章

        如果喜欢 请点个在看分享给身边的朋友


        文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

        评论