对于 Kafka 如何保证数据可靠不丢失的问题,很多人认为只要设置 acks 参数为 -1(或者 all)就可以实现,其实不然。下面通过分析 acks = -1 的作用机制来说明为什么无法保证消息绝对可靠。
public static void main(String[] args) {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "bigdata02:9092");props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer01");props.put(ProducerConfig.ACKS_CONFIG,"-1");//设置序列化的类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//初始化生产者KafkaProducer<Integer, String> producer = new KafkaProducer<>(props);...}
注:关于生产者客户端的参数,建议使用 ProducerConfig 类的字符串常量,可以避免手动输入时拼写错误,该类中定义了生产者客户端可能用到的所有参数,并对每个参数的含义进行了解释。同理,消费者客户端的参数可以通过 ConsumerConfig 类来获取。
acks 参数的作用机制
当 Broker 端收到生产者客户端的写数据请求,会调用 ReplicaManager的appendRecords() 方法来执行写数据操作,该方法签名如下:
def appendRecords(timeout: Long,//请求处理的超时时间requiredAcks: Short,//请求acks设置internalTopicsAllowed: Boolean,//是否允许写入内部主题isFromClient: Boolean,//写入方来源entriesPerPartition: Map[TopicPartition, MemoryRecords],//待写入消息responseCallback: Map[TopicPartition, PartitionResponse] => Unit,//回调逻辑delayedProduceLock: Option[Lock] = None,//用来保护消费者组操作线程安全的锁对象//消息格式转换操作的回调统计逻辑,主要用于统计消息格式转换操作过程中的一些数据指标,比如总共转换了多少条消息,花费多长时间等recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ())...
if (isValidRequiredAcks(requiredAcks)) {val sTime = time.milliseconds//TODO 数据追加到本地日志val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,isFromClient = isFromClient, entriesPerPartition, requiredAcks)...
//是否需要等待其它副本完成写入,通过delayedProduceRequestRequired方法来判断写入是否成功,//如果为true,表示需要等待其它副本同步完成消息写入if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)//创建DelayedProduce延时请求对象val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq//再一次尝试完成该延时请求//如果暂时无法完成,则将对象放入到相应的Purgatory中等待后续处理delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
//是否需要手动刷盘。一般情况下不需要设置Broker端参数log.flush.interval.messages//刷盘操作交由操作系统来完成。但某些情况下,可以设置该参数来确保高可靠性//config.flushInterval默认值为Long.MaxValue,即默认这个分支条件不会满足,不执行flush方法if (unflushedMessages >= config.flushInterval)flush()
def startup() {if (scheduler != null) {info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))//定时清理超时的文件//清理间隔:由参数 log.retention.check.interval.ms 配置,默认5分钟scheduler.schedule("kafka-log-retention",cleanupLogs _,delay = InitialTaskDelayMs,period = retentionCheckMs,TimeUnit.MILLISECONDS)info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))//定时将内存中的数据刷写磁盘//时间间隔:由参数 log.flush.scheduler.interval.ms 配置,默认为Long.MAXVALUEscheduler.schedule("kafka-log-flusher",flushDirtyLogs _,delay = InitialTaskDelayMs,period = flushCheckMs,TimeUnit.MILLISECONDS)//定时更新检查点文件,kafka有时会重启,重启会依赖于检查点文件恢复数据//时间间隔:由参数 log.flush.offset.checkpoint.interval.ms 配置,默认60秒scheduler.schedule("kafka-recovery-point-checkpoint",checkpointLogRecoveryOffsets _,delay = InitialTaskDelayMs,period = flushRecoveryOffsetCheckpointMs,TimeUnit.MILLISECONDS)//定时更新Log起始偏移量检查点//时间间隔:由参数 log.flush.start.offset.checkpoint.interval.ms 配置,默认60秒scheduler.schedule("kafka-log-start-offset-checkpoint",checkpointLogStartOffsets _,delay = InitialTaskDelayMs,period = flushStartOffsetCheckpointMs,TimeUnit.MILLISECONDS)//清理待删除文件scheduler.schedule("kafka-delete-logs",deleteLogs _,delay = InitialTaskDelayMs,unit = TimeUnit.MILLISECONDS)}//默认为trueif (cleanerConfig.enableCleaner)cleaner.startup()}
min.insync.replicas
unclean.leader.election.enable
以上主要为服务端的可靠性保证,除此之外,还要考虑客户端生产者及消费者对可靠性的影响。
对于生产者:发送消息有三种模式:发后即忘、同步和异步。使用中要避免采用发后即忘的模式;对于同步或者异步模式,在出现异常时可以获得通知,并对异常采取相应的补救措施,比如重新进行发送等。以此来避免发送失败导致的数据丢失。
对于消费者:要确保生产者写入服务端的数据被正确消费,如果没有消费到,对于应用而言数据也是丢失的。enable.auto.commit 参数的默认值为 true,即开启消费偏移量自动提交的功能,自动提交编写简单,但可能带来重复消费或者消息丢失的问题。所以如果要提高可靠性,需要将该参数设置为 false 来执行手动提交偏移量,即消息只有成功消费后,才可以提交消费偏移量。如果部分消息一直不能被成功消费,为了不影响整体消费进度,可以将这类消息暂存到死信队列,以便后续故障排除。
文章转载自大数据记事本,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




