最近在写项目的某个需求时用到了tdmq,因此系统性的学习了消息队列,以下便是一些学习心得。
消息队列(MQ)是大型分布式系统中一种常见的中间件,主要解决应用耦合、异步消息、流量削锋等问题,已成为异步RPC的主要手段之一。目前企业使用较多的消息队列有ActiveMQ、RabbitMQ、Kafka、ZeroMQ、Pulsar等。
为什么需要消息队列?
使用消息队列的场景有很多,结合我在工作中的一些思考,我认为大多数项目引入消息队列都是为了解决错峰流控和服务解耦这两主要问题。
错峰流控
错峰:线上部署的系统所面对的请求量会存在高峰和低谷,例如每天的23:00 - 8:00系统每秒接收的请求数为100,8:00 - 22:00系统每秒接收的请求数为3K+,而到了22:00 - 23:00这个时间段,系统每秒接收的请求数为1W+。面对这种情况,我们不可能将系统的QPS设计为1W+(资源都是宝贵的,不比在学校实验室),这时将MQ引入系统架构中,便解决了这种大流量冲击问题。
假设将系统的QPS设计为4K,满足了8:00 - 22:00的正常流量需求,在22:00 - 23:00时间段里,可以先把所有的接收的请求放进MQ,系统仍然以4K的速度处理从MQ中请求,无法处理的积压在MQ中在23:00 - 8:00完成处理。这样既防止了系统的宕机(系统处理请求的速度始终没超过Max),也使系统在有限的资源情况下平稳的完成大流量请求。
流控:系统的上下游接口对同一请求的处理速度是不一致的,上游的A接口一秒处理完100个请求就会把100份数据直接丢给下游B接口,而B接口的处理速度可能是一秒50个,这种情况下便会让B接口直接挂掉。此时,如果引入了MQ,可以让A接口处理完后直接写入MQ,在让MQ以每秒50个的速度发送给下游B接口处理,协调了系统各接口通信能力。
可以看出,引入MQ后,对请求的处理会存在一定的时延,所以消息队列主要被用来处理一些对于其他模块非常重要但是对于自身模块不关心的业务。
服务解耦
以常见的电商系统下单举例,系统调用链路为:
提交订单-->预扣库存-->生成订单-->付款消费成功-->通知配送系统-->通知商家系统-->通知后台系统计入财务和日志等模块。
当系统没有解耦时,用户的每个订单都是串行调用执行,本人需要等待时间太多,同时也浪费了接口并发带来的性能优化。当引入消息队列对系统架构进行解耦后,系统如下图所示,用户便可在订单系统生成订单后直接返回,其他流程可通过MQ继续调用。这样既不影响整个业务的体验,而且在性能和响应速度上有极大提升。

在引入消息队列后可以让系统更改透明化,上下游系统不再互相影响,实现了系统间的解耦。你只会关注当前模块的需要使用的消息结构,对其他模块的处理逻辑不用关心,适合部门之间的协调工作。
消息队列的演化
基于OS的MQ
单机消息队列可通过操作系统的进程间通信机制实现,如共享内存和消息队列等。
比如可以在共享内存中生成一个双端队列:消息生产进程往队列里添加消息,消息消费进程在队尾有序地取出这些消息。可以把添加消息的进程命名为producer,而取出消息的进程命名为consumer。

单机MQ易于实现,但是缺点也很明显:因为依赖于单机OS的IPC机制,所以无法实现分布式的消息传递,并且消息队列的容量也受限于单机资源。基于数据库的MQ
使用数据库(如Mysql 、Redis等)存储消息, 通过对记录的插入与删除实现消息的生产和消费,继而完成MQ功能。
以Redis 为例, 可以使用Redis自带的list实现。Redis list使用 lpush命令,从队列左边插入数据;使用 rpop命令,从队列右边取出数据。与单机的不同就在于,可以实现分布式环境下多机共用一个消息队列。
存在的缺陷:
热key性能问题:某个list的读写请求最终都会落到同一台redis实例上,且无法通过扩容来解决问题。如果对某个list的并发读写非常高,就产生了无法解决的热key,严重可能导致系统崩溃。
没有消费确认机制:执行rpop便会消费一条数据,那条消息就被从list中永久删除了。如果消费者消费失败,这条消息也没法找回了。
不支持多订阅:一条消息只能被一个consumer消费,rpop之后就没了。如果队列中存储的是应用的日志,对于同一条消息,监控系统需要消费它来进行可能的报警,财务系统需要消费它来绘制报表,链路追踪需要消费它来绘制调用关系,这种场景redis list就无法支持。
不支持二次消费:一条消息rpop之后就没了。如果消费者程序运行到一半发现代码有bug,修复之后想从头再消费一次就不行了。
针对上述缺陷,Redis 5.0引入了stream数据类型,它是专门设计成为消息队列的数据结构,借鉴了很多kafka的设计,但是随着很多分布式MQ组件的出现,仍然显得不够友好。在我看来Redis解决了分布式读数据的问题,MQ解决了分布式处理数据能力,一个读,一个写。
专用MQ中间件
随着互联网业务的发展,一个真正的消息队列,已经不是利用队列收发消息那么简单了,业务对MQ的吞吐量、扩展性、稳定性、可靠性等都提出了严苛的要求。因此,专用的分布式消息中间件开始大量出现。常见的有RabbitMQ、RocketMQ、ActiveMQ、RabbitMQ、Kafka、ZeroMQ、Pulsa等等。
如何设计一个专业消息队列
明确了MQ引入场景和解决的问题,可以看出其本质:消息的转递,将一次RPC消息传递变为两次甚至多次RPC调用。包含了两个关键:
消息的转存。
消息投递的对象和时机。
基于上述两点,设计消息队列需考虑到
确定整体的数据流向:如producer发送给MQ,MQ转发给consumer,consumer回复消费确认,消息删除、消息备份等。
利用RPC将数据流串起来,最好基于现有的RPC框架,尽量做到无状态,方便水平扩展。
考虑如何承载消息堆积,然后在合适的时机投递消息,而处理堆积的最佳方式,就是存储。存储的选型需要综合考虑性能、可靠性和开发维护成本等诸多因素。
消息投递的方式,消费模式push和pull两种。
高级特性,如可靠投递,重复消息,顺序消息等, 很多高级特性之间是相互制约的关系,这里要充分结合应用场景做出取舍。
- 维护消费关系,如单播和多播等,可利用zk/config server等保存消费关系。

MQ的基本设计点
RPC通信
MQ实现一次消息的传递至少要经过两次RPC(一个生产,一个消费),必然涉及到了RPC通信问题。消息队列的RPC和普通RPC一样,对于负载均衡、服务发现、序列化协议等话题都可以利用现有RPC框架来实现,避免重复造轮子。
服务端承载消息堆积的能力
消息到达服务端如果不经过任何处理就到接收者了,broker就失去了它的意义。为了满足我们错峰/流控/最终可达等一系列需求,把消息存储下来,然后选择时机投递就显得是顺理成章的了。这个存储可以做成很多方式,比如存储在内存里,存储在分布式KV里,存储在磁盘里,存储在数据库里等等。
存储系统
存储可以做成很多方式。比如存储在内存里,存储在分布式KV里,存储在磁盘里,存储在数据库里等等。但归结起来,主要有持久化和非持久化两种。
持久化的形式能更大程度地保证消息的可靠性(如断电等不可抗外力),并且理论上能承载更大限度的消息堆积(外存的空间远大于内存)。但并不是每种消息都需要持久化存储。很多消息对于投递性能的要求大于可靠性的要求,且数量极大(如日志)。这时候,消息不落地直接暂存内存,尝试几次failover,最终投递出去也未尝不可。常见的消息队列普遍两种形式都支持。
从速度来看,理论上,文件系统>分布式KV(持久化)>分布式文件系统>数据库,而可靠性却相反。还是要从支持的业务场景出发作出最合理的选择。
消息投递方式push和pull
简要分析下push和pull的优缺点
慢消费
慢消费是push模型最大的致命伤:如果消费者的速度比发送者的速度慢很多,会出现两种恶劣的情况:
1)消息在broker的堆积。假设这些消息都是有用的无法丢弃的,消息就要一直在broker端保存。
2)broker推送给consumer的消息consumer无法处理,此时consumer只能拒绝或者返回错误。
而pull模式下,consumer可以按需消费,不用担心自己处理不了的消息来骚扰自己,而broker堆积消息也会相对简单,无需记录每一个要发送消息的状态,只需要维护所有消息的队列和偏移量就可以了。所以对于慢消费,消息量有限且到来的速度不均匀的情况,pull模式比较合适。
消息延迟与忙等
这是pull模式最大的短板。由于主动权在消费方,消费方无法准确地决定何时去拉取最新的消息。如果一次pull取到消息了还可以继续去pull,如果没有pull取到则需要等待一段时间重新pull。但等待多久就很难判定了,可能有xx动态pull取时间调整算法,但问题的本质在于,有没有消息到来这件事情决定权不在消费方。
消息投递时机
生产者消费者模型中最大的痛点是:消费者到底应该何时进行消费。大处着眼来看,消费动作都是事件驱动的。主要事件包括:
攒够了一定数量。
到达了一定时间。
队列里有新的数据到来。
至于如何选择,也要结合具体的业务场景来决定。对于及时性要求高的数据,可用采用队列里有新的数据到来这种方式完成,比如客户端向服务端投递数据。只要队列有数据,就把队列中的所有数据刷出,否则将自己挂起,等待新数据的到来。
消息投递对象
不管是JMS 规范中的Topic/Queue,Kafka里面Topic/Partition/ConsumerGroup,还是AMQP(如RabbitMQ)的Exchange等等, 都是为了维护消息的消费关系而抽象出来的概念。
本质上,消息的消费无外乎点到点的一对一单播,或一对多广播。另外比较特殊的情况是组间广播、组内单播。比较通用的设计是,不同的组注册不同的订阅,支持组间广播。组内不同的机器,如果注册一个相同的ID,则单播;如果注册不同的ID(如IP地址+端口),则广播。例如pulsar支持的订阅模型有:
Exclusive:独占型,一个订阅只能有一个消息者消费消息。
Failover:灾备型,一个订阅同时只有一个消费者,可以有多个备份消费者。一旦主消费者故障则备份消费者接管。不会出现同时有两个活跃的消费者。
Shared:共享型,一个订阅中同时可以有多个消费者,多个消费者共享Topic中的消息。
Key_Shared:键共享型,多个消费者各取一部分消息。
通常会在公共存储上维护广播关系,如config server、zookeeper等。
MQ的高阶设计点
最终一致性(可靠投递)
如何保证消息完全不丢失?
简单的方案是,在任何不可靠业务发生前,先将消息落地,然后执行业务。当失败或者不知道结果(比如超时)时,消息状态是待发送,定时任务不停轮询所有待发送消息,最终一定可以送达。但是,这样必然导致消息可能会重复,并且在异常情况下,消息延迟较大。
例如:
producer往broker发送消息之前,需要做一次落地。
请求到server后,server确保数据落地后再告诉客户端发送成功。
支持广播的消息队列需要对每个接收者,持久化一个发送状态,直到所有接收者都确认收到,才可删除消息。即对于任何不能确认消息已送达的情况,都要重推消息。
但是,随着而来的问题就是消息重复。在消息重复和消息丢失之间,无法兼顾,要结合应用场景做出取舍。
重复消息
重复消息是不可能100%避免的,除非系统可以允许丢失消息。谈到重复消息,主要是两个话题:
如何鉴别消息重复,并幂等的处理重复消息。
一个消息队列如何尽量减少重复消息的投递。
对于第一点,每一个消息应该有它的唯一身份。不管是业务方自定义的,还是根据IP/PID/时间戳生成的MessageId,如果有地方记录这个MessageId,消息到来是能够进行比对就能完成重复的鉴定。幂等的处理消息是一门艺术,因为种种原因重复消息或者错乱的消息还是来到了,有两种通用的解决方案:版本号和状态机。
对于第二点,减少消息发送的重复,关键操作有两点:
顺序消息
对于push模式,要求支持分区且单分区只支持一个消费者消费,并且消费者只有确认一个消息消费后才能push另外一个消息,还要发送者保证发送顺序唯一。
对于pull模式,kafka的做法为:
但是这样也只是实现了消息的分区有序性,并不一定全局有序。总体而言,要求消息有序的MQ场景还是比较少的。
消费确认
当broker把消息投递给消费者后,消费者可以立即确认收到了消息。但是,有些情况消费者可能需要再次接收该消息(比如收到消息、但是处理失败),即消费者主动要求重发消息。所以,要允许消费者主动进行消费确认。