暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Kafka 如何保证消息的可靠性

大数据记事本 2021-04-05
830

    于 Kafka 如何保证数据可靠不丢失的问题,很多人认为只要设置 acks 参数为 -1(或者 all)就可以实现,其实不然。下面通过分析 acks = -1 的作用机制来说明为什么无法保证消息绝对可靠。

    首先明确一点:acks 是生产者客户端的一个参数,在初始化生产者对象 KafkaProducer 时进行配置,如:
    public static void main(String[] args) {
        Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "bigdata02:9092");
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer01");
    props.put(ProducerConfig.ACKS_CONFIG,"-1");
    //设置序列化的类
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    //初始化生产者
    KafkaProducer<Integer, String> producer = new KafkaProducer<>(props);
            ...
    }

        注:关于生产者客户端的参数,建议使用 ProducerConfig  类的字符串常量,可以避免手动输入时拼写错误,该类中定义了生产者客户端可能用到的所有参数,并对每个参数的含义进行了解释。同理,消费者客户端的参数可以通过 ConsumerConfig 类来获取。

    acks 参数的作用机制

        当 Broker 端收到生产者客户端的写数据请求,会调用 ReplicaManager的appendRecords() 方法来执行写数据操作,该方法签名如下:

      def appendRecords(timeout: Long,//请求处理的超时时间
      requiredAcks: Short,//请求acks设置
      internalTopicsAllowed: Boolean,//是否允许写入内部主题
      isFromClient: Boolean,//写入方来源
      entriesPerPartition: Map[TopicPartition, MemoryRecords],//待写入消息
      responseCallback: Map[TopicPartition, PartitionResponse] => Unit,//回调逻辑
      delayedProduceLock: Option[Lock] = None,//用来保护消费者组操作线程安全的锁对象
      //消息格式转换操作的回调统计逻辑,主要用于统计消息格式转换操作过程中的一些数据指标,比如总共转换了多少条消息,花费多长时间等
      recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()
                         ) 
                         ...
          其中的 requiredAcks 参数就是写请求中携带的 acks 参数。
          appendRecords 方法首先会验证 acks 参数的合法性,然后将数据先写入本地日志
        if (isValidRequiredAcks(requiredAcks)) {
        val sTime = time.milliseconds
        //TODO 数据追加到本地日志
        val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
        isFromClient = isFromClient, entriesPerPartition, requiredAcks)
          ...
            然后通过 delayedProduceRequestRequired 方法判断是否需要等待其它副本完成写入,如果 acks = -1,则需要等待。
          //是否需要等待其它副本完成写入,通过delayedProduceRequestRequired方法来判断写入是否成功,
          //如果为true,表示需要等待其它副本同步完成消息写入
          if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
          val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
          //创建DelayedProduce延时请求对象
          val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)


            val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
          //再一次尝试完成该延时请求
          //如果暂时无法完成,则将对象放入到相应的Purgatory中等待后续处理
          delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
              无论是 Leader 副本写入本地日志,还是 Follower 副本拉取 Leader 副本的消息写入日志,都会调用 Log.append() 方法,注意在这个方法的最后,有如下代码:
            //是否需要手动刷盘。一般情况下不需要设置Broker端参数log.flush.interval.messages
            //刷盘操作交由操作系统来完成。但某些情况下,可以设置该参数来确保高可靠性
            //config.flushInterval默认值为Long.MaxValue,即默认这个分支条件不会满足,不执行flush方法
            if (unflushedMessages >= config.flushInterval)
            flush()
                其作用是判断是否执行 flush() 刷盘操作,unflushedMessages 表示未刷盘的消息条数,config.flushInterval 表示在消息数量上的刷盘间隔,由服务端参数 log.flush.interval.messages 配置,默认值为 Long.MaxValue 。
                除了在消息数量上控制刷盘间隔,在间隔时间上也有一个参数来控制刷盘间隔。在 Kafka 中有一个 LogManager 类,用来管理日志对象,内部启动了几个后台线程,用来处理日志相关的定时任务,其 startup() 方法如下:
              def startup() {
              if (scheduler != null) {
              info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
              //定时清理超时的文件
              //清理间隔:由参数 log.retention.check.interval.ms 配置,默认5分钟
              scheduler.schedule("kafka-log-retention",
              cleanupLogs _,
              delay = InitialTaskDelayMs,
              period = retentionCheckMs,
              TimeUnit.MILLISECONDS)
              info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
              //定时将内存中的数据刷写磁盘
              //时间间隔:由参数 log.flush.scheduler.interval.ms 配置,默认为Long.MAXVALUE
              scheduler.schedule("kafka-log-flusher",
              flushDirtyLogs _,
              delay = InitialTaskDelayMs,
              period = flushCheckMs,
              TimeUnit.MILLISECONDS)
              //定时更新检查点文件,kafka有时会重启,重启会依赖于检查点文件恢复数据
              //时间间隔:由参数 log.flush.offset.checkpoint.interval.ms 配置,默认60秒
              scheduler.schedule("kafka-recovery-point-checkpoint",
              checkpointLogRecoveryOffsets _,
              delay = InitialTaskDelayMs,
              period = flushRecoveryOffsetCheckpointMs,
              TimeUnit.MILLISECONDS)
              //定时更新Log起始偏移量检查点
              //时间间隔:由参数 log.flush.start.offset.checkpoint.interval.ms 配置,默认60秒
              scheduler.schedule("kafka-log-start-offset-checkpoint",
              checkpointLogStartOffsets _,
              delay = InitialTaskDelayMs,
              period = flushStartOffsetCheckpointMs,
              TimeUnit.MILLISECONDS)
              //清理待删除文件
              scheduler.schedule("kafka-delete-logs",
              deleteLogs _,
              delay = InitialTaskDelayMs,
              unit = TimeUnit.MILLISECONDS)
              }
              //默认为true
              if (cleanerConfig.enableCleaner)
              cleaner.startup()
              }
                  刷盘的时间间隔由 log.flush.scheduler.interval.ms 参数控制,默认值同样为 Long.MaxValue ,这也就意味着默认情况下 Kafka 内部不会触发刷盘操作,而是交由操作系统完成。如果手动配置了这两个参数,虽然可以通过同步刷盘来提高可靠性,但同时也会严重影响其吞吐性能,一般不建议。
                  所以,默认情况下 acks 设置为 -1(或者all),只是保证消息写入了 ISR 列表中所有副本的本地缓存,并不保证写入磁盘。所以如果这些副本所在节点集体宕机,数据照样会丢,只不过发生这种情况的概率很小。
                  注意这里是写入 ISR 列表中所有副本的本地缓存,而不是该分区的所有副本。这里就涉及到了另一个服务端参数:
              • min.insync.replicas
                  该参数表示 ISR 列表中最少的副本数,默认为 1。如果不修改这个参数,当所有的 Follower 副本被踢出 ISR 列表后,此时只剩 Leader 副本,那么设置 acks = -1 其实和设置 acks = 1 的效果是一样的,只要写入 Leader 副本就会返回写成功,如果此时 Leader 副本所在节点宕机,消息就会丢失。所以一般会设置 1< min.insync.replicas <= 副本总数,如果为 3 副本,可以设置该值为 2。至于为什么不设置为 3,是对可靠性和可用性的权衡,如果设置为 3,消息虽然更可靠,但是一旦有一个副本被踢出 ISR 列表,那么将导致消息无法写入,可用性无法保证。
                  除此之外,和可靠性及 ISR 列表有关的还有一个参数:
              • unclean.leader.election.enable
                  该参数表示如果 Leader 副本下线,是否可以从不在 ISR 列表的 Follower 副本中选择一个成为 Leader,默认为 false。如果设置成 true,由于被踢出 ISR 列表的副本还未完全同步 Leader 副本的数据,也会造成数据的丢失。当然,这里也会涉及到可靠性和可用性的权衡,如果设置为 false,可以提高可靠性;但如果 Leader 副本下线,且 ISR 列表中没有其它 Follower 副本,将导致无可用副本,降低了可用性。

                  以上主要为服务端的可靠性保证,除此之外,还要考虑客户端生产者及消费者对可靠性的影响。

                  对于生产者:发送消息有三种模式:发后即忘、同步和异步。使用中要避免采用发后即忘的模式;对于同步或者异步模式,在出现异常时可以获得通知,并对异常采取相应的补救措施,比如重新进行发送等。以此来避免发送失败导致的数据丢失。

                  对于消费者:要确保生产者写入服务端的数据被正确消费,如果没有消费到,对于应用而言数据也是丢失的。enable.auto.commit 参数的默认值为 true,即开启消费偏移量自动提交的功能,自动提交编写简单,但可能带来重复消费或者消息丢失的问题。所以如果要提高可靠性,需要将该参数设置为 false 来执行手动提交偏移量,即消息只有成功消费后,才可以提交消费偏移量。如果部分消息一直不能被成功消费,为了不影响整体消费进度,可以将这类消息暂存到死信队列,以便后续故障排除。

                  总结:忽略服务器集体故障等外界因素,就 Kafka 本身使用方式而言,消息绝对可靠是很难保障的,但可以通过一些配置来提高可靠性,比如设置多副本、设置 acks = -1、设置 min.insync.replicas > 1、关闭 unclean 选举等。但提高可靠性的同时,往往会对可用性或吞吐量造成影响,所以在面对不同的应用场景时,需要对各方面进行权衡,选择合适的参数配置。
              参考:《深入理解Kafka核心设计与实践原理》
              文章转载自大数据记事本,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

              评论