01
幂等性(Idempotence)概述

// 开启幂等性的配置示例Properties props = new Properties();props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);// 当启用幂等性时,以下配置会被自动设置// acks=all// retries=Integer.MAX_VALUE// max.in.flight.requests.per.connection=5
01
幂等性实现原理
每个生产者会被分配一个 PID(Producer ID)
每条消息会附带一个序列号(Sequence Number)
Broker 端会维护 <PID, 分区> 对应的序列号
如果新消息序列号不大于已提交的最大序列号,则会被视为重复消息并丢弃
02
幂等性的限制
只能保证单个生产者会话内的幂等性
只能保证单分区内的幂等性
跨会话、跨分区的幂等性需要使用事务特性
02
事务特性(Transactions)
事务可以保证多条消息要么全部成功要么全部失败,同时还支持跨分区和会话的幂等性。

// 事务生产者配置示例Properties props = new Properties();props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");// 必须启用幂等性props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);// 事务生产者代码示例KafkaProducer<String, String> producer = new KafkaProducer<>(props);producer.initTransactions(); 初始化事务try {producer.beginTransaction(); 开始事务发送多条消息producer.send(record1);producer.send(record2);producer.commitTransaction(); // 提交事务} catch (Exception e) {producer.abortTransaction(); // 异常时回滚事务} finally {producer.close();}
01
事务实现原理
负责管理事务的状态
维护事务日志(transaction log)
协调事务的提交和回滚
1.2. 事务状态
AddPartitionsToTxnRequest -> 添加分区到事务ProduceRequest -> 发送消息EndTxnRequest -> 结束事务(提交/回滚)
1.3 事务保证
原子性:多条消息要么全部成功,要么全部失败
隔离性:未提交的事务对消费者不可见
持久性:已提交的事务不会丢失
02
事务的使用场景
2.1 消息处理链
// 消费-处理-生产模式producer.beginTransaction();try {// 消费消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));// 处理消息for (ConsumerRecord<String, String> record : records) {// 处理逻辑producer.send(new ProducerRecord<>("output-topic", processedValue));}// 提交消费位移和生产消息producer.sendOffsetsToTransaction(offsets, groupId);producer.commitTransaction();} catch (Exception e) {producer.abortTransaction();}
2.1 跨分区原子性操作
producer.beginTransaction();try {// 向多个分区发送消息producer.send(new ProducerRecord<>("topic1", "key1", "value1"));producer.send(new ProducerRecord<>("topic2", "key2", "value2"));producer.commitTransaction();} catch (Exception e) {producer.abortTransaction();}
03
性能考虑
幂等性的性能影响
额外的序列号检查开销
服务端需要维护更多状态
通常影响很小(<5%)
事务的性能影响
需要额外的事务协调开销
引入了更多的网络往返
建议只在必要时使用事务
04
最佳实践
// 推荐的基本配置props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);props.put(ProducerConfig.ACKS_CONFIG, "all");props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
// 事务超时设置props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000); // 60秒// 事务ID要具有唯一性props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-" + UUID.randomUUID());
05
加群请添加作者

06
获取文档及视频资料

推荐阅读系列文章
建议收藏 | Dinky系列总结篇 建议收藏 | Flink系列总结篇 建议收藏 | Flink CDC 系列总结篇 建议收藏 | Doris实战文章合集 建议收藏 | Seatunnel 实战文章系列合集 建议收藏 | 实时离线输数仓(数据湖)总结篇 建议收藏 | 实时离线数仓实战第一阶段总结
如果喜欢 请点个在看分享给身边的朋友
文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




