暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

你用的Kafka真的100%消息不丢失?

Renk 2020-04-24
844

,本篇主要内容导航:

1,kafka 简介

2,kafka 组件介绍

3,kafka 原理

4,kafka 生产落地

5,kafka 调优

简介

    kafka 是一个分布式流平台,具有发布和订阅记录流,类似于消息队列或企业消息传递系统;以容错的持久方式存储记录流;处理记录流。通常用于建立实时流数据管道,以可靠地在系统或应用程序之间获取数据,构建实时流应用程序,以转换或响应数据流。

    以上是官方介绍,简单理解为,它是一个发布订阅的消息系统,同时支持持久化消息和处理流数据,广泛用于应用间解耦,大数据处理,异步削峰填谷及流处理。下面我们在学习kafka之前还是先来了解它由哪些组件组成他的核心应用API工具。

核心组件

    kafka 具有多个核心的API/组件,分别是:

1)Producer:消息生产者/发布记录端

2)Consumer:消息消费者/订阅记录端

3)Topic:消息主题(逻辑概念)/类别,划分消息的所属类别,对消息进行分组。

4)Connector:数据源连接器,比如,DB连接器,可以获取表中的内容交由处理。

5)Stream:流处理器,允许将输入信息进行流式处理,输出一个或多个流。

6)Partition:分区(物理存在),消息记录log日志,一个topic可以有一个或多个partition。

7)Broker:kafka集群中的服务器进程

8)Admin:管理和检查topic

9)Segment:partition会被成分割成多个大小相等的segment数据文件

10)Replica:partition保障高可用的副本。

11)Leader:同一个topic中各partition选举出来的一个角色。producer和consumer只与其交互。

12)Follower:同一个topic下leader的跟随同步者。

13)Controller:kafka集群中各个broker服务器选举出来的一个角色。用来管理leader或failover等

14)Zookeeper:用于broker的管理,保存broker们的元数据。

15)Consumer Group:消费者组,同一个topic下每条消息只能被group中的一个consumer消费。

    上面介绍了kafka集群里面的各种组成,可能还有部分人并不知道,kafka是干什么的或者这个发布订阅到底是什么。下面我们来看一张图,就理解发布->订阅了。

    我们看到,生产者负责将消息发布到消息中间件(如kafka),消息中间件负责将消息存起来。消费者同时主动订阅了这个主题的消息。然后,消息中间件就会将刚才那条消息发给消费者,用于消费者使用。所以看我们将kafka、rocketmq、rabbitmq、activemq等称为消息中间件,也是因为作为生产与消费者之间的一个中间组件,当然也有些人称为消息队列是一个道理。

    你说为什么不直接从生产者发到消费者呢?多快啊!中间还搞一套这么复杂的东西。确实直接调用是快一些。这样做的好处也是多多。解耦这点最容易想到了。你要是有很多个服务,他们之间很多相互调用的关系的话。维护特别麻烦。出了故障查找原因极为不便。通过消息中间件生产者与消费者,就不必维护对方的调用关系了。统一走发布订阅这种模式,就算其中一个挂掉,也不会影响另外一个的正常运行。也只是业务处理失败而已。

    另外,消息中间件还有异步削峰的功能。比如双十一同一时间,涌入大量下单请求。这个时候。如果使用直调方式。肯定服务扛不住。但是使用消息队列这种就可以处理。让订单堆积在中间件里面。然后消费者看能力消费订单数据。可以多并发批量消费。也可以增加消费者同时消费。

    针对kafka流式处理也是不得不说的一个功能。举个例子,A用户平时付款都是50以内的金额。突然来了一笔1万元的大额金额消费,这个时候,我们可以通过kafka的stream处理对这笔消费消息,只要检测到了就推送用户一条消息。给予提示,避免事故。

原理

    上面讲了那么多,应该理解了消息中间件的基础流程,接下来我们讲一下本篇的主角kafka的基本原理,kafka依赖于zookeeper(分布式协调器)管理集群的元数据及做一些选举操作。

    那什么是kafka的选举?我们知道一台服务器有太多不可控因素影响其运行。什么断电、网络、火灾,程序故障等等,有软件原因也有硬件原因。总之,单台服务器会存在单点故障。那么为了保证我们的服务不受单点故障影响。那么我们就用多台服务器来运行。所以就出现了,主备,主从等高可用方案。

    主备方案(主机为客户端提供服务,备机在主机挂掉的情况下升级为主机,平时不对外提供服务),在之前将Hadoop之HDFS原理这篇文章中就讲过NameNode是主备。

    主从方案(主机对外提供服务,从机也对外提供有限服务),kafka集群里面就是基于这种方案。集群中多个broker在zookeeper中注册并上报元数据(各broker中的ip,name,topic,partition等等),选举出一个controller,主要负责管理topic中partition分区和replica副本的状态的变化,以及执行重分配分区之类的管理任务。

    在kafka中或者其他消息中间件里,消息都是以topic来分类,为什么要强调topic是一个逻辑概念,因为一个topic包含的消息内容有可能分散在集群中的不同机器上。而将topic分散在不同物理机上的就是pratition(分区),分区相当于一个目录用于存储消息的数据/索引,每个 Topic 分为多个 Partition。前面也讲过,任何单体的东西都有单点故障,partition也不例外。虽然一个topic下多个partition分布在不同的机器上。但是如果所在的其中一个机器宕机了。也就造成这个topic缺少了一部分partition,数据就丢失了。kafka的处理方式就是,为每个partition生成多个副本(replica)放在不同的机器上。副本之间再选举出一个leader,其他的都称作为follower(跟随者),所有follower都会与leader保持心跳同步数据(是不是和hdfs存储数据块很像)。Kafka动态维护这些同步状态的副本的集合(简称ISR,在zookeeper元数据中),当检测到有副本下线,则从ISR集合中去除。如果leader所在的机器下线了。则从新选举出新的leader,就算前一个leader恢复了。也只能作为新的leader的follower跟随者同步数据。来看图:

    就像上图这样。有一点要注意,不管生产消息还是消费消息。都是通过leader节点处理消息的。follower不对外提供读写,这其中一个原因就是是为了防止follower与leader数据不同步造成信息不一致。当然牺牲了一定的读写性能,相对于读写分离而言。

    leader与follower之间的数据同步是异步拉取的方式。这里给讲一下,一般的消息同步有两种方式一种是拉取(pull),一种是推送(push)。它们各有优缺点,先说推送这种方式,优点就是及时,一有信息订阅方就可以收到。没有消息的时候,就维持两端的连接心跳,网络开销不大。缺点也是相对的。如果订阅方/消费方处理能力有限的话,就有可能造成消息在消费端堆积。势必影响效率。只能靠增加消费者来避免消息堆积/延迟带来的影响。而拉取这种方式就有点相反。一般拉取都是有时间间隔的。有可能刚好在时间间隔之下,有消息到达。不能及时消费,要等下次拉取时才能获取得到。网络开销方面肯定会比push方式大。优点则是对于消费者可以灵活处理消息。拉取消息视处理能力而定。如果增加消费者,同样很容易处理更多消息。

    生产者将消息发送给kafka之后,会将消息存入本地磁盘,防止因服务器宕机,消息丢失。消费者想要消费数据的时候。kafka就会从磁盘里面把消息读出来并发给消费者。我们知道,一般的读写磁盘的效率是很低的。那么kafka怎么做到百万级的消息发送的呢?用了哪些主要的技术呢?

    第一,kafka存储消息到磁盘,用的是顺序读写加稀疏索引技术,实现快速读写功能。顺序读写,顾名思义,往文件后面追加写入数据。这性能堪比内存的随机读写速度。当然写入是可以了,读取呢,难道要从头到尾读一遍找出需要的消息内容?肯定不会啦。将稀疏索引之前,我们来举个例子。我们查字典的时候。可以通过笔画或者拼音找到要查的字,那么当用拼音查找的时候,我们会先找到声母,所在的位置,然后韵母所在的位置。同样的道理,kafka的稀疏索引就是通过Segment文件实现的,Segment File 又由 index file 和 data file 组成,他们总是成对出现,后缀 ".index" 和 ".log" 分表表示 Segment 索引文件和数据文件。如图:

    index文件存储,消息所在的偏移量,log文件存储消息数据。查找消息时,通过id在index文件中定位在哪一个段。然后在log文件中二分法查找该消息。这样不管是内存开销还是速率都有保证。

    第二,使用了os cache(零拷贝)技术。我们先来看一下,一般普通读取磁盘信息需要经过哪些步骤,我们都知道,应用进程都是在操作系统之上运行的。当应用要读取磁盘里面的信息的时候,会先将请求发给操作系统的内存缓冲区(os cache)。交由操作系统去完成磁盘的io,比我们应用程序直接去操作磁盘,效率要高很多。内核缓冲区将读取的内容返回给应用程序内存。然后我们的应用程序可以中间做一些处理。再丢给socket 缓冲区。最后交由网卡等各种卡发送给需要的终端,经历了物理硬件-->内核态-->用户态-->内核态-->物理硬件这个过程。

    不知道你看出来没,这流程有很多冗余的步骤。要读取一条数据,机器里面要经历至少4次拷贝。看如下图的步骤:

    如果我们只是单纯的读取,对数据不做任何处理,这里其实我们完全可以省掉②,③两个步骤,直接发个指令让操作系统读取磁盘之后将数据写入网卡等各种卡。时间与空间效率都能提升不少。你可能会问,为什么不减少①,④两步,因为直接让我们的应用程序操作磁盘,然后在操作socket等。io效率低下,肯定没有操作系统管理的优秀。同时对cup的利用也是非常高的,同时有肯能带来GC等问题。这种事就不麻烦我们了。交由操作系统搞定。您能想到的kafka都想到了,kafka使用零拷贝技术之后的读取大概流程也是交由操作系统完成读取发送,给张图体验一下:

    我们在来看一张比较完整的流程图,直观感受一下:

 

    第三,如果只是因为前面介绍的那两个原因,可能还不足以让kafka提供高并发支持。因为还存在写入瓶颈。我们知道每次线程请求发送数据的时候,内存使用完了,就会回收,然后GC。如果发送消息特别频繁,那内存使用-回收这个过程也很频繁。造成的后果也就是GC特别频繁。程序运行效果很差。kafka对于写入使用了批量压缩/打包,发送消息,这种方式的好处就在于,吞吐量巨大。消息生产者中会用到一个BufferPool(内存缓冲池),作用就是,当一个线程发送数据,使用内存空间用完之后,不是通过GC回收,而是清空里面的内容并放回内存池中。下次要使用内存的时候,直接到缓冲池里面去取。这样就不会造成频繁的GC了。这点还不够,如果只是来一条消息发送一条,这网络io也不得不是一个问题。kafka做的就是,把要发送的消息先攒起来组装成一个大的requests,等到一定时候或者requests达到一定大小,一次性发出去。网络开销也不是很大。所以这就是kafka吞吐量巨大的原因所在,(rocketmq也是参考的kafka,但是这一点没有借鉴。),还是来张图:

生产落地

    上面基本上介绍完了kafka的基本原理及几个核心的技术,下面我们来看一下,在生产实践中怎么让其落地为我们服务的。 

    要保证消息全链路不丢失,我们先要知道,什么情况下有可能丢失。我们还是看一下整个发送流程需要注意的几个步骤:

  

    从这张图可以清晰的看到,整个kafka消息发送到消息消费,有可能出问题的5个步骤。不管是出于网络原因还是机房地震,还是程序bug都有可能。来一步步分析,provider发送消息这一步好理解,就是一次网络请求;消息到达kafka服务并将消息持久化到磁盘(落盘);然后反馈给发送端,消息已落盘;消费者拉取消息来消费这一步也是网络操作;消费者开始消费消息或者说处理完成这条消息,处理成功到服务端一个响应,失败则重试直到一定次数之后消息被放入死信队列;kafka服务端收到响应,就会把读取消息的偏移量标记为下一条消息的偏移。

    发送端即provider发送消息到kafka服务也就是①步,有可能失败,这个时候我们可以设置重试机制来保证,步骤②落盘即将消息存入磁盘是有失败的可能性。消息发送到落盘有两种方式,第一同步刷盘方式,也就是说从provider生产数据到kafka 消息写入磁盘同步操作。写入失败则返回给provider发送失败让其重试等,第二异步刷盘,也就是只要写入消息,不管是否真的存到磁盘,就直接返回发送成功。此种方式优点就是快,吞吐量大,缺点也明显不能保证消息真的保存成功了。我们在生产中可以根据自己的业务场景灵活使用。在生产环境都是多broker集群部署,当然同步刷盘也分两种情况。一种是等到所有的副本都落盘之后再返回给provider成功,另外一种是等到主副本(leader)节点落盘成功即返回成功。两种的差别也是可以想到的,如果写入leader同时保存成功,但是这个leader所在的机器下线了还没来得及同步到其他follower。那么这消息就丢失了。即便再次上线,也只能作为follower不对外提供读写,也不会将其同步到leader中。所以在实际开发中对可用性极高的场景,那可以选择等所有的副本落盘成功,在返回给provider一个成功的receive。

    走到消息消费阶段。④步失败的话很好处理,就在拉取一次消息就可以了。然后就是消息的消费。异常情况下有可能消费者这边出问题了,消费的逻辑没走完就,网络断了、挂机了等。这个时候我们应该重试或者上线后重新再来一遍。可是我这么知道上次的消息是否真的消费了呢?这就用到了ack机制。绝大多数MQ都有这个机制。简单说就是,告诉MQ服务端,这条消息是否被消费成功,如果成功了,那么次条消息就会被删除或者不被下次消费了,如果失败了,则没有收到ack信息,就会继续重发次消息,直到被消费同时返回ack信息为止。当然可以设置重发次数,超过了设定的重发次数,kafka会将消息放入一个死信队列(以“.DIT”结尾的topic),这个时候我们可以通过定时任务来定时处理这些死信。最后在人工介入处理兜底等等。①②③④⑤步生产落地的实现基本上就是这种思路了。

调优

    其实经过上面落地生产的方案实践,我们的产品是没有问题了,但是。如果只是这样的话。这一节也没必要了。我们生产环境各不相同,有的系统可能机器配置高一点,有的网络带宽小一点。为了让kafka充分发挥它的性能,我们还要对其配置参数,内存,网络等进行优化。

    要调优的地方有哪些呢,在官网上就有相应的配置参数详解:https://kafka.apache.org/documentation/#majordesignelements。这里就不一一讲解了,挑几个重点的给讲一下。

    先来看生产者provider:

内存缓冲大小:buffer.memory。还记得前面讲过我们生产者发送消息的时候并不是每次都新开辟内存使用完了就释放,而是在内存缓冲池里面去取,这个参数就是设置这个缓冲池的大小的。根据你实际开发来调整。

请求压缩包大小:batch.size。kafka不是每次网络请求就直接发给服务端,而是攒成一个batch数据包。再将这些包request出去。这里就是这个批量数据包的大小,注意它的大小要视情况而定,大了小了都不好。

攒包时间:linger.ms。什么意思?刚才我们不是说每次的网络请求会攒成batch包。但是如果系统请求量不大,请求间隔时间很长,如果batch.size设大了,那要等到猴年马月才能攒齐啊。所以这个参数就是控制着,如果到了给定的时间还没攒齐,也就当成一个batch发出去了。

数据请求大小:max.request.size。为了保障kafka的网络投递消息更稳定,不至于batch包大小不一造成的抖动,在batch包发出去之前,会判断每次网络投递的消息的最大大小。

回馈机制:required.acks。前面讲过我不多说了,只讲一下,参数的设置,一共三个参数,1,0,-1/all。默认值是1。参数-1/all也就是,所有副本都同步后返回响应成功。0就是只要收到provider的信息就返回成功,不管是否存入磁盘。1代表,只要leader所在的副本写入磁盘成功就返回成功。根据你的业务选择合适的配置。

反馈超时时间:timeout.ms。和上面参数搭配使用,不多介绍。

刷盘方式:producer.type。消息写入磁盘方式:async:异步发送。sync:同步发送。根据生产业务情况选择。

序列化相关:serializer.class和key.serializer.class。

重试次数及间隔时间:max.retries和backoff.ms。

    再来看消费端有几个重要的配置参数:

消费组ID:group.id。多个消费者组成一个消费者组,消费组内可以不重复消费消息。

消费信息反馈:auto.commit.enable。生产一般设为false。不自动提交消息的offset。如果设为true。当消费者未能消费成功。下一次读取到的消息就没有这个未消费成功的消息。

重试最大次数,重连时间和超时时间:max.retries、backoff.ms和timeout.ms。根据实际业务设置。

同步时间:zookeeper.sync.time.ms。follower可以落后ZK leader的最大时间。不能太大或太小。

线程数:num.consumer.fetchers。用于获取数据的线程数及并发处理线程数,根据消费者处理能力而定。

基本上provider和consumer的一些参数配置介绍和调优方向也已讲完了。

补充

重复消费:

    大部分的MQ都不保证幂等。主要是因为MQ只是一个中间件角色。既保证幂等又保证消息不丢失,处理效率会降很多。一般都是通过自己业务处理重复消费。比如加状态机判断,唯一索引等等。

顺序消费:

    我们的kafka处理发送消息接收消息是不那么有序的。但在实际开发中我们有一些场景是需要顺序消费的。比如,一个订单消息过来。先扣库存,在扣款,在加积分,最后发货。这些顺序不能颠倒。这个时候。kafka包括rocketmq等它们的处理方式就是,发送消息方,将顺序消息发送到同一个消息队列(partition)中,然后,消费端订阅者订阅这个topic同时消费对应的partition中的消息。当然这里的消费者不能设置多个,同时消费者不能直接设置并发消费,可以通过在消费者端添加不同的队列。再来控制业务并发消费。具体思路如下图:

由于篇幅有限,就到这里了。回顾一下前面讲的Kafka消息中间件在生产实践的方案,你们是这样的吗?若是还没用过,赶紧试试吧!如果有什么问题或者建议请留言。如本篇对你有用。请帮忙转发,或点个赞吧。





文章转载自Renk,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论