暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

大数据序列二十三:大数据中的消息系统-kafka(消息重复的场景及解决⽅案)

码上飞驰 2022-01-12
1444

        消息重复和丢失是kafka中很常⻅的问题,主要发⽣在以下三个阶段:

        1:⽣产者阶段

        2:broke阶段

        3:消费者阶段

        

⽣产者阶段重复场景

        根本原因:

        ⽣产发送的消息没有收到正确的broke响应,导致⽣产者重试。

        ⽣产者发出⼀条消息,broke落盘以后因为⽹络等种种原因发送端得到⼀个发送失败的响应或者⽹络中断,然后⽣产者收到⼀个可恢复的Exception重试消息导致消息重复。


        重试过程: 

        过程说明:

    1:new KafkaProducer()后创建⼀个后台线程KafkaThread扫描RecordAccumulator中是否有消息。

     2:调⽤KafkaProducer.send()发送消息,实际上只是把消息保存到RecordAccumulator中。

          3:后台线程KafkaThread扫描到RecordAccumulator中有消息后,将消息发送到kafka集群。

          4:如果发送成功,那么返回成功。

          5:如果发送失败,那么判断是否允许重试。如果不允许重试,那么返回失败的结果;如果允许重试,把消息再保存到RecordAccumulator中,等待后台线程KafkaThread扫描再次发送。


        可恢复异常说明

       异常是RetriableException类型或者TransactionManager允许重试;RetriableException类继承关系如下:


        记录顺序问题

        如果设置 max.in.flight.requests.per.connection ⼤于1(默认5,单个连接上发送的未确认请求的最⼤数量,表示上⼀个发出的请求没有确认下⼀个请求⼜发出了)。⼤于1可能会改变记录的顺序,因为如果将两个batch发送到单个分区,第⼀个batch处理失败并重试,但是第⼆个batch处理成功,那么第⼆个batch处理中的记录可能先出现被消费。

        设置 max.in.flight.requests.per.connection 1,可能会影响吞吐量,可以解决单个⽣产者发送顺序问题。如果多个⽣产者,⽣产者1先发送⼀个请求,⽣产者2后发送请求,此时⽣产者1返回可恢复异常,重试⼀定次数成功了。虽然⽣产者1先发送消息,但⽣产者2发送的消息会被先消费。


        ⽣产者发送重复解决⽅案

        启动kafka的幂等性

        要启动kafka的幂等性,设置:enable.idempotence=true。

         设置ack=all。

         设置retries > 1 。

        ack=0,不重试(可能会丢消息,适⽤于吞吐量指标重要性⾼于数据丢失,例如:⽇志收集

        


⽣产者和broker阶段消息丢失场景

        ack=0,不重试:

        ⽣产者发送消息完,不管结果,如果发送失败也就丢失了。       

        ack=1,leader crash

        ⽣产者发送消息完,只等待Leader写⼊成功就返回了,Leader分区丢失了,此时Follower没来及同步,消息丢失。

        unclean.leader.election.enable 配置true

        允许选举ISR以外的副本作为leader,会导致数据丢失,默认为false。⽣产者发送异步消息,只等待Lead写⼊成功就返回,Leader分区丢失,此时ISR中没有Follower,Leader从OSR中选举,因为OSR中本来落后于Leader造成消息丢失。


        解决⽣产者和broker阶段消息丢失

        禁⽤unclean选举,ack=all

        ack=all / -1

        tries > 1

        unclean.leader.election.enable : false

        ⽣产者发完消息,等待Follower同步完再返回,如果异常则重试。副本的数量可能影响吞吐量,不超过5个,⼀般三个。

        不能允许unclean Leader选举。

        配置:min.insync.replicas > 1

        当⽣产者将 acks 设置为 all (或 -1 )时, min.insync.replicas>1 。指定确认消息写成功需要的最⼩副本数量。达不到这个最⼩值,⽣产者将引发⼀个异常(要么是NotEnoughReplicas,要么是NotEnoughReplicasAfterAppend)。

        当⼀起使⽤时, min.insync.replicas 和 ack 允许执⾏更⼤的持久性保证。⼀个典型的场景是创建⼀个复制因⼦为3的主题,设置min.insync复制到2个,⽤ all 配置发送。将确保如果⼤多数副本没有收到写操作,则⽣产者将引发异常。

        失败的offset单独记录

        ⽣产者发送消息,会⾃动重试,遇到不可恢复异常会抛出,这时可以捕获异常记录到数据库或缓存,进⾏单独处理。

        

消费者数据重复场景及解决⽅案

        根本原因

        数据消费完没有及时提交offset到broker。

        场景

        消息消费端在消费过程中挂掉没有及时提交offset到broke,另⼀个消费端启动拿之前记录的offset开始消费,由于offset的滞后性可能会导致新启动的客户端有少量重复消费。


        解决⽅案

        1:取消⾃动提交

        每次消费完或者程序退出时⼿动提交。这可能也没法保证⼀条重复。

        2:下游做幂等

        ⼀般是让下游做幂等或者尽量每消费⼀条消息都记录offset,对于少数严格的场景可能需要把offset或唯⼀ID(例如订单ID)和下游状态更新放在同⼀个数据库⾥⾯做事务来保证精确的⼀次更新或者在下游数据表⾥⾯同时记录消费offset,然后更新下游数据的时候⽤消费位移做乐观锁拒绝旧位移的数据更新。




                                      ╭─  ─  ─  ─  ─  ─  ─  ─  ─  ─╮
                                      ┃      Flying  Young   
                               ─  ─  ─  ─  ─  ─  ─  ─  ─  ─  ─  ─  ─╯


文章转载自码上飞驰,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论