01
分区(Partition)机制详解
// 分区消费示例代码public class KafkaPartitionConsumerDemo {private static final String BOOTSTRAP_SERVERS = "192.168.241.128:9092";private static final String TOPIC_NAME = "partition_demo_topic";public static void main(String[] args) {Properties props = new Properties();// 配置基本参数props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, "partition_demo_group");try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {// 手动分配分区TopicPartition partition0 = new TopicPartition(TOPIC_NAME, 0);TopicPartition partition1 = new TopicPartition(TOPIC_NAME, 1);consumer.assign(Arrays.asList(partition0, partition1));// 设置分区起始偏移量consumer.seekToBeginning(Arrays.asList(partition0, partition1));// 消费消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("Partition: %d, Offset: %d, Key: %s, Value: %s%n",record.partition(), record.offset(), record.key(), record.value());}}}}}
02
消费者组(Consumer Group)深入解析
消费者组是 Kafka 实现消息消费的核心机制,它提供了消息消费的可扩展性和故障容错能力。一个消费者组由一个或多个消费者实例组成,这些消费者共同消费订阅主题的消息。消费者组的设计遵循一个基本原则:一个分区只能被同一个消费者组中的一个消费者消费,但一个消费者可以同时消费多个分区。这种设计既保证了消费的负载均衡,又避免了消息重复消费的问题。
public class KafkaConsumerGroupDemo {private static final String BOOTSTRAP_SERVERS = "192.168.241.128:9092";private static final String TOPIC_NAME = "group_demo_topic";private static final String GROUP_ID = "group_demo";public static void main(String[] args) {// 启动多个消费者实例int consumerCount = 3;CountDownLatch latch = new CountDownLatch(consumerCount);for (int i = 0; i < consumerCount; i++) {final int consumerId = i;new Thread(() -> {try {runConsumer(consumerId);} finally {latch.countDown();}}).start();}}private static void runConsumer(int consumerId) {Properties props = new Properties();// 配置消费者组参数props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-" + consumerId);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {// 订阅主题consumer.subscribe(Collections.singletonList(TOPIC_NAME), new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {System.out.printf("Consumer-%d: Partitions revoked: %s%n", consumerId, partitions);}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {System.out.printf("Consumer-%d: Partitions assigned: %s%n", consumerId, partitions);}});// 消费消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {processRecord(consumerId, record);}}}}}
这个示例展示了如何创建一个消费者组,并启动多个消费者实例共同消费消息。通过实现 ConsumerRebalanceListener 接口,我们可以监控分区的分配和撤销过程,这在需要在再平衡前后执行特定操作(如保存位移)时非常有用。在实际应用中、消费者组的配置需要根据业务场景和性能需求来调整,比如会话超时时间、心跳间隔等参数都会影响消费者组的行为和性能。
03
位移管理(Offset Management)详解
public class KafkaOffsetManagementDemo {private static final String BOOTSTRAP_SERVERS = "192.168.241.128:9092";private static final String TOPIC_NAME = "offset_demo_topic";private static final String GROUP_ID = "offset_demo_group";public static void main(String[] args) {Properties props = new Properties();// 配置手动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {consumer.subscribe(Collections.singletonList(TOPIC_NAME));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (TopicPartition partition : records.partitions()) {List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);for (ConsumerRecord<String, String> record : partitionRecords) {processRecord(record);}// 获取最后一条消息的位移long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();// 提交位移Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1));try {consumer.commitSync(offsetsToCommit);System.out.printf("Committed offset %d for partition %s%n",lastOffset + 1, partition);} catch (CommitFailedException e) {// 处理提交失败handleCommitFailure(partition, lastOffset, e);}}}}}private static void handleCommitFailure(TopicPartition partition, long offset, Exception e) {// 实现重试逻辑System.err.printf("Failed to commit offset %d for partition %s: %s%n",offset, partition, e.getMessage());// 可以选择重试提交或者记录错误}private static void processRecord(ConsumerRecord<String, String> record) {// 处理消息的业务逻辑System.out.printf("Processing record: Partition=%d, Offset=%d, Value=%s%n",record.partition(), record.offset(), record.value());}}
这个示例展示了如何实现手动位移提交,包括同步提交和异常处理。在实际应用中,位移管理策略需要根据业务的可靠性要求来选择。对于要求高可靠性的场景,建议使用手动提交,并实现适当的重试机制和错误处理逻辑。
04
再平衡(Rebalance)机制详解
public class KafkaRebalanceDemo {private static final String BOOTSTRAP_SERVERS = "192.168.241.128:9092";private static final String TOPIC_NAME = "rebalance_demo_topic";private static final String GROUP_ID = "rebalance_demo_group";private static final Logger logger = LoggerFactory.getLogger(KafkaRebalanceDemo.class);public static void main(String[] args) {Properties props = new Properties();// 基础配置props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");// 再平衡相关配置props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,CooperativeStickyAssignor.class.getName());try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {consumer.subscribe(Collections.singletonList(TOPIC_NAME), new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {logger.info("Rebalance started - Partitions revoked: {}", partitions);RebalanceMonitor.onRebalanceStart();// 保存消费位移for (TopicPartition partition : partitions) {long position = consumer.position(partition);saveOffsets(partition, position);}}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {logger.info("Rebalance completed - Partitions assigned: {}", partitions);RebalanceMonitor.onRebalanceComplete();// 恢复消费位移for (TopicPartition partition : partitions) {long savedOffset = getSavedOffset(partition);if (savedOffset >= 0) {consumer.seek(partition, savedOffset);}}}});// 消费消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));processRecordsWithRetry(records, consumer);}}}private static void processRecordsWithRetry(ConsumerRecords<String, String> records,KafkaConsumer<String, String> consumer) {int retries = 3;boolean processed = false;while (!processed && retries > 0) {try {for (ConsumerRecord<String, String> record : records) {processRecord(record);}consumer.commitSync();processed = true;} catch (Exception e) {logger.error("Error processing records. Retries left: {}", --retries, e);if (retries == 0) {// 处理最终失败的情况handleProcessingFailure(records);}}}}}
// 1. 静态成员配置props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG,"consumer-" + UUID.randomUUID().toString());// 2. 增量式再平衡配置props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,CooperativeStickyAssignor.class.getName());// 3. 再平衡监控实现public class RebalanceMonitor {private static final AtomicLong rebalanceCount = new AtomicLong(0);private static final AtomicLong totalRebalanceDuration = new AtomicLong(0);private static long lastRebalanceTime = 0;public static void onRebalanceStart() {lastRebalanceTime = System.currentTimeMillis();rebalanceCount.incrementAndGet();logger.info("Rebalance started. Total count: {}", rebalanceCount.get());}public static void onRebalanceComplete() {long duration = System.currentTimeMillis() - lastRebalanceTime;totalRebalanceDuration.addAndGet(duration);double avgDuration = (double) totalRebalanceDuration.get() rebalanceCount.get();logger.info("Rebalance completed in {} ms. Average duration: {} ms",duration, avgDuration);// 检查再平衡健康状况checkRebalanceHealth(duration, avgDuration);}private static void checkRebalanceHealth(long duration, double avgDuration) {if (duration > 10000 || avgDuration > 5000) {logger.warn("Rebalance performance degradation detected!");// 触发告警alertRebalancePerformance(duration, avgDuration);}}}
06
最佳实践总结
Properties props = new Properties();// 基础网络配置props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-" + UUID.randomUUID());// 性能相关配置props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024"); // 最小抓取大小props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500"); // 最大等待时间props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500"); // 单次拉取最大记录数props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "1048576"); // 分区获取大小// 可靠性配置props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 手动提交props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000"); // 最大轮询间隔props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"); // 会话超时props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000"); // 心跳间隔
public class KafkaConsumerWithErrorHandling {private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerWithErrorHandling.class);public void consume() {try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {consumer.subscribe(topics);while (true) {try {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));// 处理消息for (ConsumerRecord<String, String> record : records) {try {processRecord(record);} catch (Exception e) {// 单条消息处理异常handleSingleRecordError(record, e);continue;}}// 提交位移try {consumer.commitSync();} catch (CommitFailedException e) {handleCommitError(e);}} catch (WakeupException e) {// 处理优雅关闭handleWakeup();break;} catch (Exception e) {// 处理其他异常handleConsumerError(e);}}}}private void handleSingleRecordError(ConsumerRecord<String, String> record, Exception e) {logger.error("Error processing record: {}", record, e);// 实现死信队列处理sendToDeadLetterQueue(record);}private void handleCommitError(CommitFailedException e) {logger.error("Failed to commit offsets", e);// 实现重试逻辑retryCommit();}}
监控关键指标:消费延迟、处理时间、错误率等 实现优雅关闭机制 使用死信队列处理失败消息 实现适当的重试策略 保持完善的日志记录
07
加群请添加作者

08
获取文档及视频资料

推荐阅读系列文章
建议收藏 | Dinky系列总结篇 建议收藏 | Flink系列总结篇 建议收藏 | Flink CDC 系列总结篇 建议收藏 | Doris实战文章合集 建议收藏 | Seatunnel 实战文章系列合集 建议收藏 | 实时离线输数仓(数据湖)总结篇 建议收藏 | 实时离线数仓实战第一阶段总结
如果喜欢 请点个在看分享给身边的朋友
文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




