1.什么是MQ?为什么要使用MQ
MQ:MessageQueue,消息队列。 队列,是一种FIFO 先进先出的数据结构。消息由生产者发送到MQ进行排队,然后按原来的顺序交由消息的消费者进行处理。QQ和微信就是典型的MQ。
MQ的作用主要有以下三个方面:
1. 异步
例子:比如我们最常见的短信验证码功能,当我们在界面点击“获取验证码”后,我们还可以同时进行其他的操作,如输入更新的密码等,此时,我们不需要一直等到手机收到短信了才进行下一步的操作,这就是异步处理,提高了用户体验。
作用:异步能提高系统的响应速度、吞吐量。

2. 解耦
例子:如常见的订单系统,当有订单下单时,我们需要减去库存,但如果订单、库存的逻辑都放在一个系统中,不止处理事件需要很长,系统的耦合性比较高,此时,使用消息中间件,可以实现将订单业务和库存业务抽出来做不同的系统,每次下单的时候可以将下单信息放入消息中间间中,然后库存系统去订阅它,只有有订单数据就进行减去库存操作,这样就将应用解耦了。
作用:
1、服务之间进行解耦,才可以减少服务之间的影响。提高系统整体的稳定性以及可扩展性。
2、另外,解耦后可以实现数据分发。生产者发送一个消息后,可以由一个或者多个消费者进行消费,并且消费者的增加或者减少对生产者没有影响。

3. 削峰
例子:如常见的秒杀系统,如果有5万个商品可以秒杀,没有消息中间件的话,所有的请求都一次性到后台,此时系统很容易卡死,引入消息中间件如消息队列,此时可以在队列中设置好可以存储数据的数量,这样每次用户请求会先但消息队列中,消息队列就减去1,当消息队列中存储长度为0时,直接返回秒杀失败,这样就避免了所有用户请求可能在同一时间到达系统后台,达到流量削峰的作用。
作用:以稳定的系统资源应对突发的流量冲击。

2.RibbitMQ
2.1简介
RabbitMQ 是使用 Erlang 语言开发的开源消息队列系统,基于 AMQP 协议来实现。AMQP 的主要特 征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP 协议更多用在企业 系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
2.2扩展
2.2.1协议
什么是协议
协议:是在TCP/IP协议基础之上构建的种约定成的规范和机制,目的是让客户端进行沟通和通讯。并且这种协议下规范必须具有持久性,高可用,高可靠的性能。
为什么不直接采用TCP/IP协议去传递消息?
因为TCP/IP协议太过于简单,并不能承载消息的内容和载体,因此在此之上增加一些内容,给消息的传递分发高可用提供基础。

消息中间件负责数据的传递,存储,和分发消费三个部分,数据的存储和分发的过程中肯定要遵循某种约定成俗的规范,是采用底层的TCP/IP,UDP协议还是在这基础上自己构建等,而这些约定成俗的规范就称之为:协议。
所谓协议是指:
计算机底层操作系统和应用程序通讯时共同遵守的组约定,只有遵循共同的约定和规范,系统和底层操作系统之间才能相互交流。
和一般的网络应用程序的不同,它主要负责数据的接受和传递,所以性能比较的高。
协议对数据格式和计算机之间交换数据都必须严格遵守规范。
网络协议的三要素
语法:语法是用户数据与控制信息的结构与格式,以及数据出现的顺序。
语义:语义是解控制信息每个部分的意义。它规定了需要发出何种控制信息以及完成的动作与做出什么样的响应。
时序:时序是对事件发生顺序的详细说明。
类比http请求协议
语法:http规定了请求报文和响应报文的格式
语义:客户端主动发起请求称之为请求。(这是一种定义,同时你发起的是post/get请求)
时序:一个请求对应个响应。(一定先有请求在有响应,这个是时序)
而消息中间件采用的并不是http协议,而常见的消息中间件协议有:OpenWire、AMQP、MQTT、Kafka、OpenMessage协议
面试题:为什么消息中间件不直接使用http协议呢?
因为http请求报文头和响应报文头是比较复杂的,包含了cookie、数据的加密解密、状态码、晌应码等附加的功能,但是对于个消息而言,我们并不需要这么复杂,也没有这个必要性,它其实就是负责数据传递,存储,分发就够,要追求的是高性能。尽量简洁,快速。
大部分情况下http大部分都是短链接,在实际的交互过程中,一个请求到响应很有可能会中断,中断以后就不会就行持久化,就会造成请求的丢失。这样就不利于消息中间件的业务场景,因为消息中间件可能是一个长期的获取消息的过程,出现问题和故障要对数据或消息就行持久化等,目的是为了保证消息和数据的高可靠和稳健的运行。
常用消息中间件协议
AMQP协议(Advanced Message Queuing Protocol—高级消息队列协议)
它由摩根大通集团联合其他公司共同设计。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
特性:分布式事务支、消息的持久化支持、高性能和高可靠的消息处理优势
AMQP典型的实现者是RabbitMQ、ACTIVEMQ等,其中RabbitMQ由Erlang开发

MQTT协议(Message Queueing Telemetry Transport—消息队列遥测传输协议)
它是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。
特点:轻量、结构简单、传输快、不支持事务、没有持久化设计
应用场景:适用于计算能力有限、低带宽、网络不稳定的场景
支持者:RabbitMQ、ACTIVEMQ(默认情况下关闭,需要打开)
kafka协议
基于TCP/IP的二进制协议。消息内部是通过长度来分割,由些基本数据类型组成。
特点:结构简单、解析速度快、无事务支持、有持久化设计
2.3消息队列持久化
持久化简单来说就是将数据存入磁盘,而不是存在内存中随服务器重启断开而消失,使数据能够永久保存。

常见的持久化方式和对比:
ActiveMQ | RabbitMQ | Kafka | RocketMQ | |
文件存储 | 支持 | 支持 | 支持 | 支持 |
数据库 | 支持 | / | / | / |
2.4消息的分发策略
MQ消息队列有如下几个角色:

Producer:消息生产者。负责产生和发送消息到 Broker
Broker:消息处理中心。负责消息存储、确认、重试等,一般其中会包含多个 queue
Consumer:消息消费者。负责从 Broker 中获取消息,并进行相应处理
2.4.1生产者产生消息后,MQ进行存储,消费者如何获得消息呢?
一般的获取方式无外乎外推(push)或者(pull)两种方式,典型的git就有推拉机制,我们发送的http请求就是一种典型的拉取数据库数据返回的过程。而消息队列MQ是一种推送过程,而这些推机制会适用到很多的业务场景,也有很多对应的推机制策略
场景分析一

比如我在APP上下了一个订单,我们的系统和服务很多,我们如何得知这个消息被那个系统或者那些服务或者系统进行消费,此时就需要一个消费策略,或称为消费的方法论。
场景分析二

在发送消息的过程中可能会出现异常,或者网络的抖动,故障等等因为造成消息的无法消费,比如用户在下订单,消费MQ接受,订单系统出现故障,导致用户支付失败,那么这个时候就需要消息中间件就必须支持消息重试机制策略。也就是支持:出现问题和故障的情况下,消息不丢失还可以进行重发
2.4.2消息分发策略的机制和对比
ActiveMQ | RabbitMQ | Kafka | RocketMQ | |
发布订阅 | 支持 | 支持 | 支持 | 支持 |
轮询分发 | 支持 | 支持 | 支持 | / |
公平分发 | / | 支持 | 支持 | / |
重发 | 支持 | 支持 | / | 支持 |
消息拉取 | / | 支持 | 支持 | 支持 |
3、MQ的优缺点
上面MQ的所用也就是使用MQ的优点。 但是引入MQ也是有他的缺点的:
系统可用性降低
系统引入的外部依赖增多,系统的稳定性就会变差。一旦MQ宕机,对业务会产生影响。这就需要考虑如何保证MQ的高可用。
系统复杂度提高
引入MQ后系统的复杂度会大大提高。以前服务之间可以进行同步的服务调用,引入MQ后,会变为异步调用,数据的链路就会变得更复杂。并且还会带来其他一些问题。比如:如何保证消费不会丢失?不会被重复调用?怎么保证消息的顺序性等问题。
消息一致性问题
A系统处理完业务,通过MQ发送消息给B、C系统进行后续的业务处理。如果B系统处理成功,C系统处理失败怎么办?这就需要考虑如何保证消息数据处理的一致性。
4、几大MQ产品特点比较
常用的MQ产品包括Kafka、RabbitMQ和RocketMQ。我们对这三个产品做下简单的比较,重点需要理解他们的适用场景。

另外,关于这三大产品更详细的比较,可以参见《kafka vs rabbitmq vs rocketmq.pdf》
关于RabbitMQ的功能特性,可以在官网( https://www.rabbitmq.com/ )上看到,包含 Asynchronous Message(异步消息)、Developer Experience(开发体验)、Distributed Deployment(分布式部署)、Enterprise & Cloud Ready(企业云部署)、Tools & Plugins(工具和插件)、Management & Monitoring(管理和监控)六大部分。所以其中的功能是相当丰富的,而我们肯定只能关注重点的部分内容,所以还是要经常到官网上去看看的。📎kafka vs rabbitmq vs rocketmq.pdf
5.Windows安装RibbitMQ
5.1.先安装otp_win64_24.1.7.exe
5.2.以管理员身份运行
5.3.接着选取要安装的路径,然后一路傻瓜式安装 next 下一步,安装即可。
【注意】不要安装在中文或带空格的文件路径下
5.4.找到刚刚安装的文件夹

5.5.配置系统环境变量


5.6.环境检查

5.7.安装ribbitMQ


一路next就行
注意: 安装路径尽量不要包含中文或者空格
5.8.找到刚刚安装的文件夹,进入到sbin目录

5.9.输入cmd进入cmd窗口

5.10.启用管理插件

5.11.启动rabbitMQ服务

5.12.访问
访问http://localhost:15672,用户名:guest, 密码:guest


6.Linux环境安装RibbitMQ
6.1.安装 Erlang 语言包并检查
rpm -ivh erlang-23.2.7-1.el7.x86_64.rpm
erl -version

6.2.安装RibbitMQ
rpm -Uvh rabbitmq-server-3.9.15-1.el7.noarch.rpm

6.3.启动并查看mq安装情况
whereis rabbitmqctl
service rabbitmq-server start
service rabbitmq-server status

6.4.配置web管理界面
rabbitmq-plugins enable rabbitmq_management

6.5.访问
通过ip+端口访问,但是登录的话无法使用guest登录,因为guest只允许在localhost登录,解决步骤参考6.6往下

6.6.添加admin用户
rabbitmqctl add_user admin admin
rabbitmqctl set_permissions -p admin "." "." ".*"

这样就可以通过admin/admin访问控制台了

7.RibbitMQ基础使用
RabbitMQ搭建完成后,可以在Web控制台上选择Exchange或者Queue来发送消息了,我们可以简单体验下,也可以留到下一部分编程模型时再深入体验。
7.1.添加一个虚拟机

7.2.创建一个经典队列

7.3.页面消息发送
创建完成后,选择这个队列,就可以在页面上直接发送消息以及消费消息了。

8.RabbitMQ基础概念
RabbitMQ是基于AMQP协议开发的一个MQ产品, 首先我们以Web管理页面为 入口,来了解下RabbitMQ的一些基础概念,这样我们后续才好针对这些基础概念 进行编程实战。 可以参照下图来理解RabbitMQ当中的基础概念:

8.1. 虚拟主机 virtual host
这个在之前搭建时已经体验过了。RabbitMQ出于服务器复用的想法,可以在一个RabbitMQ集群中划分出多个虚拟主机,每一个虚拟主机都有AMQP的全套基础组件,并且可以针对每个虚拟主机进行权限以及数据分配,并且不同虚拟主机之间是完全隔离的。
8.2.连接 Connection
客户端与RabbitMQ进行交互,首先就需要建立一个TPC连接,这个连接就是Connection。
8.3.信道 Channel
一旦客户端与RabbitMQ建立了连接,就会分配一个AMQP信道 Channel。每个信道都会被分配一个唯一的ID。也可以理解为是客户端与RabbitMQ实际进行数据交互的通道,我们后续的大多数的数据操作都是在信道 Channel 这个层面展开的。
RabbitMQ为了减少性能开销,也会在一个Connection中建立多个Channel,这样便于客户端进行多线程连接,这些连接会复用同一个Connection的TCP通道,所以在实际业务中,对于Connection和Channel的分配也需要根据实际情况进行考量。
8.4.交换机 Exchange
这是RabbitMQ中进行数据路由的重要组件。消息发送到RabbitMQ中后,会首先进入一个交换机,然后由交换机负责将数据转发到不同的队列中。RabbitMQ中有多种不同类型的交换机来支持不同的路由策略。从Web管理界面就能看到,在每个虚拟主机中,RabbitMQ都会默认创建几个不同类型的交换机来。

交换机多用来与生产者打交道。生产者发送的消息通过Exchange交换机分配到各个不同的Queue队列上,而对于消息消费者来说,通常只需要关注自己感兴趣的队列就可以了。
8.5. 队列 Queue
队列是实际保存数据的最小单位。队列结构天生就具有FIFO的顺序,消息最终都会被分发到不同的队列当中,然后才被消费者进行消费处理。这也是最近RabbitMQ功能变动最大的地方。最为常用的是经典队列Classic。RabbitMQ 3.8.X版本添加了Quorum队列,3.9.X又添加了Stream队列。
8.5.1.Classic 经典队列
这是RabbitMQ最为经典的队列类型。在单机环境中,拥有比较高的消息可靠性。

在这个图中可以看到,经典队列可以选择是否持久化(Durability)以及是否自动删除(Auto delete)两个属性。
其中,Durability有两个选项,Durable和Transient。 Durable表示队列会将消息保存到硬盘,这样消息的安全性更高。但是同时,由于需要有更多的IO操作,所以生产和消费消息的性能,相比Transient会比较低。
Auto delete属性如果选择为是,那队列将在至少一个消费者已经连接,然后所有的消费者都断开连接后删除自己。
后面的Arguments部分,还有非常多的参数,可以点击后面的问号逐步了解。
其实这时,应该结合kafka和RocketMQ这几个MQ产品,对队列有一个更全面的理解。在MQ当中,队列其实是MQ集群中的一个数据分片的最小单位。在MQ集群中,一个Topic会对应多个队列,而这些队列会均匀的分配到集群的各个节点当中。
8.5.2.Quorum 仲裁队列
仲裁队列,是RabbitMQ从3.8.0版本,引入的一个新的队列类型,整个3.8.X版本,也都是在围绕仲裁队列进行完善和优化。仲裁队列相比Classic经典队列,在分布式环境下对消息的可靠性保障更高。官方文档中表示,未来会使用Quorum仲裁队列代替传统Classic队列。
Quorum是基于Raft一致性协议实现的一种新型的分布式消息队列,他实现了持久化,多备份的FIFO队列,主要就是针对RabbitMQ的镜像模式设计的。简单理解就是quorum队列中的消息需要有集群中多半节点同意确认后,才会写入到队列中。这种队列类似于RocketMQ当中的DLedger集群。这种方式可以保证消息在集群内部不会丢失。同时,Quorum是以牺牲很多高级队列特性为代价,来进一步保证消息在分布式环境下的高可靠。
从整体功能上来说,Quorum队列是在Classic经典队列的基础上做减法,因此对于RabbitMQ的长期使用者而言,其实是会影响使用体验的。他与普通队列的区别:

从官方这个比较图就能看到,Quorum队列大部分功能都是在Classic队列基础上做减法,比如Non-durable queues表示是非持久化的内存队列。Exclusivity表示独占队列,即表示队列只能由声明该队列的Connection连接来进行使用,包括队列创建、删除、收发消息等,并且独占队列会在声明该队列的Connection断开后自动删除。
其中有个特例就是这个Poison Message(有毒的消息)。所谓毒消息是指消息一直不能被消费者正常消费(可能是由于消费者失败或者消费逻辑有问题等),就会导致消息不断的重新入队,这样这些消息就成为了毒消息。这些读消息应该有保障机制进行标记并及时删除。Quorum队列会持续跟踪消息的失败投递尝试次数,并记录在"x-delivery-count"这样一个头部参数中。然后,就可以通过设置 Delivery limit参数来定制一个毒消息的删除策略。当消息的重复投递次数超过了Delivery limit参数阈值时,RabbitMQ就会删除这些毒消息。当然,如果配置了死信队列的话,就会进入对应的死信队列。
Quorum队列更适合于 队列长期存在,并且对容错、数据安全方面的要求比低延迟、不持久等高级队列更能要求更严格的场景。**例如 电商系统的订单,引入MQ后,处理速度可以慢一点,但是订单不能丢失。
也对应以下一些不适合使用的场景:
1、一些临时使用的队列:比如transient临时队列,exclusive独占队列,或者经常会修改和删除的队列。
2、对消息低延迟要求高: 一致性算法会影响消息的延迟。
3、对数据安全性要求不高:Quorum队列需要消费者手动通知或者生产者手动确认。
4、队列消息积压严重 : 如果队列中的消息很大,或者积压的消息很多,就不要使用Quorum队列。Quorum队列当前会将所有消息始终保存在内存中,直到达到内存使用极限。
扩展:Raft一致性协议
Raft是一种用于分布式系统中实现一致性的协议,它旨在提供比传统的Paxos算法更易于理解的一种方法。Raft的设计目标是使工程师更容易理解和实现分布式一致性算法,从而推动分布式系统领域的进步。
以下是Raft协议的一些关键概念和机制:
领导者选举(Leader Election):Raft协议基于领导者(leader)和跟随者(follower)的模型。在初始状态下,所有节点都是跟随者。当没有领导者时,任何节点都可以成为候选人并发起选举。选举通过RPC(远程过程调用)来进行,候选人需要获得大多数节点的选票才能成为新的领导者。
日志复制(Log Replication):一旦选出领导者,它就负责接收客户端请求,并将这些请求附加到自己的日志中。然后,领导者会向其他节点发送日志条目的复制请求,确保所有节点都复制了相同的日志记录。只有当大多数节点确认已经应用了某个日志条目时,该日志条目才被认为是已提交的。
安全性(Safety):Raft协议通过引入各种安全机制来确保系统在出现网络分区或其他异常情况时仍能保持一致性。例如,在领导者选举过程中,每个候选人都会包含自己的任期号码,以避免过期的领导者产生。此外,Raft还使用了递增的日志索引来确保日志的一致性和顺序性。
总体而言,Raft协议提供了一种清晰、可理解的方法来实现分布式一致性,使得工程师更容易理解和部署分布式系统。由于其优雅的设计和易用性,Raft协议在实际系统中得到了广泛的应用。
8.5.3.Stream队列
Stream队列是RabbitMQ自3.9.0版本开始引入的一种新的数据队列类型,也是目前官方最为推荐的队列类型。这种队列类型的消息是持久化到磁盘并且具备分布式备份的,更适合于消费者多,读消息非常频繁的场景。

Stream队列的核心是以append-only只添加的日志来记录消息,整体来说,就是消息将以append-only的方式持久化到日志文件中,然后通过调整每个消费者的消费进度offset,来实现消息的多次分发。下方有几个属性也都是来定义日志文件的大小以及保存时间。如果你熟悉Kafka或者RocketMQ,会对这种日志记录消息的方式非常熟悉。这种队列提供了RabbitMQ已有的其他队列类型不太好实现的四个特点:
1、large fan-outs 大规模分发
当想要向多个订阅者发送相同的消息时,以往的队列类型必须为每个消费者绑定一个专用的队列。如果消费者的数量很大,这就会导致性能低下。而Stream队列允许任意数量的消费者使用同一个队列的消息,从而消除绑定多个队列的需求。
2、Replay/Time-travelling 消息回溯
RabbitMQ已有的这些队列类型,在消费者处理完消息后,消息都会从队列中删除,因此,无法重新读取已经消费过的消息。而Stream队列允许用户在日志的任何一个连接点开始重新读取数据。
3、Throughput Performance 高吞吐性能
Strem队列的设计以性能为主要目标,对消息传递吞吐量的提升非常明显。
4、Large logs 大日志
RabbitMQ一直以来有一个让人诟病的地方,就是当队列中积累的消息过多时,性能下降会非常明显。但是Stream队列的设计目标就是以最小的内存开销高效地存储大量的数据。
整体上来说,RabbitMQ的Stream队列,其实有很多地方借鉴了其他MQ产品的优点,在保证消息可靠性的基础上,着力提高队列的消息吞吐量以及消息转发性能。因此,Stream也是在视图解决一个RabbitMQ一直以来,让人诟病的缺点,就是当队列中积累的消息过多时,性能下降会非常明显的问题。RabbitMQ以往更专注于企业级的内部使用,但是从这些队列功能可以看到,Rabbitmq也在向更复杂的互联网环境靠拢,未来对于RabbitMQ的了解,也需要随着版本推进,不断更新。
但是,从整体功能上来讲,队列只不过是一个实现FIFO的数据结构而已,这种数据结构其实是越简单越好。而当前RabbitMQ区分出这么多种队列类型,其实极大的增加了应用层面的使用难度,应用层面必须有一些不同的机制兼容各种队列。所以,在未来版本中,RabbitMQ很可能还是会将这几种队列类型最终统一成一种类型。例如官方已经说明未来会使用Quorum队列类型替代经典队列,到那时,应用层很多工具就可以得到简化,比如不需要再设置durable和exclusive属性。虽然Quorum队列和Stream队列目前还没有合并的打算,但是在应用层面来看,他们两者是冲突的,是一种竞争关系,未来也很有可能最终统一保留成一种类型。至于未来走向如何,我们可以在后续版本拭目以待。
9.RabbitMQ编程模型
RabbitMQ的使用生态已经相当庞大,支持非常多的语言。而就以java而论,也已经支持非常多的扩展。我们接下来会从原生API、SpringBoot集成、SpringCloudStream集成,三个角度来详细学习RabbitMQ的编程模型。在学习编程模型时,要注意下,新推出的Stream队列,他的客户端跟另外两种队列稍有不同。
9.1.原生API
使用RabbitMQ提供的原生客户端API进行交互。先来了解下如何使用Classic和Quorum队列。至于Stream队列,目前他使用的是和这两个队列不同的客户端
9.1.1.Maven依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
9.1.2.基础编程模型
这些各种各样的消息模型其实都对应一个比较统一的基础编程模型。
step1、首先创建连接,获取Channel
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
step2、声明queue队列
channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments);
如果要声明一个Quorum队列,则只需要在后面的arguments中传入一个参数,x-queue-type,参数值设定为quorum。
Map<String,Object> params = new HashMap<>();
params.put("x-queue-type","quorum");
//声明Quorum队列的方式就是添加一个x-queue-type参数,指定为quorum。默认是classic
channel.queueDeclare(QUEUE_NAME, true, false, false, params);
注意:
1、对于Quorum类型,durable参数就必须是true了,设置成false的话,会报错。同样,exclusive参数必须设置为false
如果要声明一个Stream队列,则 x-queue-type参数要设置为 stream .
Map<String,Object> params = new HashMap<>();
params.put("x-queue-type","stream");
params.put("x-max-length-bytes", 20_000_000_000L); maximum stream size: 20 GB
params.put("x-stream-max-segment-size-bytes", 100_000_000); size of segment files: 100 MB
channel.queueDeclare(QUEUE_NAME, true, false, false, params);
注意:
1、同样,durable参数必须是true,exclusive必须是false。 --你应该会想到,对于这两种队列,这两个参数就是多余的了,未来可以直接删除。
2、x-max-length-bytes 表示日志文件的最大字节数。x-stream-max-segment-size-bytes 每一个日志文件的最大大小。这两个是可选参数,通常为了防止stream日志无限制累计,都会配合stream队列一起声明。
声明的队列,如果服务端没有,那么会自动创建。但是如果服务端有了这个队列,那么声明的队列属性必须和服务端的队列属性一致才行。
step3、Producer根据应用场景发送消息到queue
channel.basicPublish(String exchange, String routingKey, BasicProperties props,message.getBytes("UTF-8"));
其中exchange是一个Producer与queue的中间交互机制。可以让Producer把消息按一定的规则发送到不同的queue,不需要的话就传空字符串
step4、Consumer消费消息
定义消费者,消费消息进行处理,并向RabbitMQ进行消息确认。确认了之后就表明这个消息已经消费完了,否则RabbitMQ还会继续让别的消费者实例来处理。
主要收集了两种消费方式
1、被动消费模式,Consumer等待rabbitMQ 服务器将message推送过来再消费。一般是启一个一直挂起的线程来等待。
channel.basicConsume(String queue, boolean autoAck, Consumer callback);
其中autoAck是个关键。autoAck为true则表示消息发送到该Consumer后就被Consumer消费掉了,不需要再往其他Consumer转发。为false则会继续往其他Consumer转发。
要注意如果每个Consumer一直为false,会导致消息不停的被转发,不停的吞噬系统资源,最终造成宕机。
2、另一种是主动消费模式。Comsumer主动到rabbitMQ服务器上去获取指定的messge进行消费。
关键代码
GetResponse response = channel.basicGet(QUEUE_NAME, boolean autoAck);
3.Stream队列消费 在当前版本下,消费Stream队列时,需要注意三板斧的设置。
channel必须设置basicQos属性。
正确声明Stream队列。
消费时需要指定offset。
具体参看示例代码。注意其中的注释。
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
//1、这个属性必须设置。
channel.basicQos(100);
//2、声明Stream队列
Map<String,Object> params = new HashMap<>();
params.put("x-queue-type","stream");
params.put("x-max-length-bytes", 20_000_000_000L); maximum stream size: 20 GB
params.put("x-stream-max-segment-size-bytes", 100_000_000); size of segment files: 100 MB
channel.queueDeclare(QUEUE_NAME, true, false, false, params);
//Consumer接口还一个实现QueueConsuemr 但是代码注释过期了。
Consumer myconsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
System.out.println("========================");
String routingKey = envelope.getRoutingKey();
System.out.println("routingKey >" + routingKey);
String contentType = properties.getContentType();
System.out.println("contentType >" + contentType);
long deliveryTag = envelope.getDeliveryTag();
System.out.println("deliveryTag >" + deliveryTag);
System.out.println("content:" + new String(body, "UTF-8"));
(process the message components here ...)
消息处理完后,进行答复。答复过的消息,服务器就不会再次转发。
没有答复过的消息,服务器会一直不停转发。
channel.basicAck(deliveryTag, false);
}
};
//3、消费时,必须指定offset。 可选的值:
// first: 从日志队列中第一个可消费的消息开始消费
// last: 消费消息日志中最后一个消息
// next: 相当于不指定offset,消费不到消息。
// Offset: 一个数字型的偏移量
// Timestamp:一个代表时间的Data类型变量,表示从这个时间点开始消费。例如 一个小时前 Date timestamp = new Date(System.currentTimeMillis() - 60 * 60 * 1_000)
Map<String,Object> consumeParam = new HashMap<>();
consumeParam.put("x-stream-offset","next");
channel.basicConsume(QUEUE_NAME, false,consumeParam, myconsumer);
channel.close();
这三点要尤其注意,因为当前版本的错误提示非常让人着急。
step5、完成以后关闭连接,释放资源
channel.close();
9.1.3.官网的消息场景
原生API重点就是学习并理解RabbitMQ的官方消息模型。具体参见 https://www.rabbitmq.com/getstarted.html 。其中可以看到,RabbitMQ官方提供了总共七种消息模型,这其中,6 RPC部分是使用RabbitMQ来实现RPC远程调用,这个场景通常不需要使用MQ来实现,所以也就不当作重点来学习。而7 Publisher Confirms是当前版本新引进来的一种消息模型,对保护消息可靠性有很重要的意义。
这些消息模型基本上涵盖了日常开发中的绝大部分场景,而对于他们的API使用,其实都是大同小异,非常容易上手,并且在实际开发中,一般也有其他更好的框架来整合RabbitMQ使用 。所以对这一部分的学习,理解业务场景是最为重要的。


9.1.3.1.hello world初体验

最直接的方式,P端发送一个消息到一个指定的queue,中间不需要任何exchange规则。C端按queue方式进行消费。
关键代码:(其实关键的区别也就是几个声明上的不同。)
producer:
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
consumer
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
9.1.3.2.Work queues 工作序列

工作任务模式,领导部署一个任务,由下面的一个员工来处理。
producer:
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); 任务一般是不能因为消息中间件的服务而被耽误的,所以durable设置成了true,这样,即使rabbitMQ服务断了,这个消息也不会消失
channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
Consumer:
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
channel.basicQos(1);
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
这个模式应该是最常用的模式,也是官网讨论比较详细的一种模式,所以官网上也对这种模式做了重点讲述。
首先。Consumer端的autoAck字段设置的是false,这表示consumer在接收到消息后不会自动反馈服务器已消费了message,而要改在对message处理完成了之后,再调用channel.basicAck来通知服务器已经消费了该message.这样即使Consumer在执行message过程中出问题了,也不会造成message被忽略,因为没有ack的message会被服务器重新进行投递。
但是,这其中也要注意一个很常见的BUG,就是如果所有的consumer都忘记调用basicAck()了,就会造成message被不停的分发,也就造成不断的消耗系统资源。这也就是 Poison Message(毒消息)其次,官方特意提到的message的持久性。关键的message不能因为服务出现问题而被忽略。还要注意,官方特意提到,所有的queue是不能被多次定义的。如果一个queue在开始时被声明为durable,那在后面再次声明这个queue时,即使声明为 not durable,那这个queue的结果也还是durable的。
然后,是中间件最为关键的分发方式。这里,RabbitMQ默认是采用的fair dispatch,也叫round-robin模式,就是把消息轮询,在所有consumer中轮流发送。这种方式,没有考虑消息处理的复杂度以及consumer的处理能力。而他们改进后的方案,是consumer可以向服务器声明一个prefetchCount,我把他叫做预处理能力值。channel.basicQos(prefetchCount);表示当前这个consumer可以同时处理几个message。这样服务器在进行消息发送前,会检查这个consumer当前正在处理中的message(message已经发送,但是未收到consumer的basicAck)有几个,如果超过了这个consumer节点的能力值,就不再往这个consumer发布。
这种模式,官方也指出还是有问题的,消息有可能全部阻塞,所有consumer节点都超过了能力值,那消息就阻塞在服务器上,这时需要自己及时发现这个问题,采取措施,比如增加consumer节点或者其他策略另外 官网上没有深入提到的,就是还是没有考虑到message处理的复杂程度。有的message处理可能很简单,有的可能很复杂,现在还是将所有message的处理程度当成一样的。还是有缺陷的,但是目前也只看到dubbo里对单个服务有权重值的概念,涉及到了这个问题。
9.1.3.3.Publish/Subscribe 订阅 发布 机制
type为fanout 的exchange:

这个机制是对上面的一种补充。也就是把preducer与Consumer进行进一步的解耦。producer只负责发送消息,至于消息进入哪个queue,由exchange来分配。如上图,就是把producer发送的消息,交由exchange同时发送到两个queue里,然后由不同的Consumer去进行消费。
关键代码 ===》 producer: 只负责往exchange里发消息,后面的事情不管。
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
receiver: 将消费的目标队列绑定到exchange上。
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
关键处就是type为”fanout” 的exchange,这种类型的exchange只负责往所有已绑定的队列上发送消息。
9.1.3.4.Routing 基于内容的路由
type为”direct” 的exchange

这种模式一看图就清晰了。 在上一章 exchange 往所有队列发送消息的基础上,增加一个路由配置,指定exchange如何将不同类别的消息分发到不同的queue上。
Producer:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
Receiver:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.queueBind(queueName, EXCHANGE_NAME, routingKey1);
channel.queueBind(queueName, EXCHANGE_NAME, routingKey2);
channel.basicConsume(queueName, true, consumer);
9.1.3.5.Topics 话题
type为"topic" 的exchange

这个模式也就在上一个模式的基础上,对routingKey进行了模糊匹配
单词之间用,隔开,* 代表一个具体的单词。# 代表0个或多个单词。
Producer:
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
Receiver
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
channel.queueBind(queueName, EXCHANGE_NAME, routingKey1);
channel.queueBind(queueName, EXCHANGE_NAME, routingKey2);
channel.basicConsume(queueName, true, consumer);
9.1.3.6.RPC 远程调用
远程调用是同步阻塞的调用远程服务并获取结果。
RPC远程调用机制其实并不是消息中间件的处理强项。毕竟消息队列机制很大程度上来说就是为了缓冲同步RPC调用造成的瞬间高峰。而RabbitMQ的同步调用示例,看着也确实怪怪的。并且,RPC远程调用的场景,也有太多可替代的技术会比用消息中间件处理得更优雅,更流畅。
A note on RPC
Although RPC is a pretty common pattern in computing, it’s often criticised. The problems arise when a programmer is not aware whether a function call is local or if it’s a slow RPC. Confusions like that result in an unpredictable system and adds unnecessary complexity to debugging. Instead of simplifying software, misused RPC can result in unmaintainable spaghetti code.
Bearing that in mind, consider the following advice:
● Make sure it’s obvious which function call is local and which is remote.
● Document your system. Make the dependencies between components clear.
● Handle error cases. How should the client react when the RPC server is down for a long time?
When in doubt avoid RPC. If you can, you should use an asynchronous pipeline - instead of RPC-like blocking, results are asynchronously pushed to a next computation stage.
官网上这一大堆说明,其实我觉得就是表明,叫你不要用消息中间件来做RPC。所以关于这个RPC调用功能,就不再多做解释了。代码实现可以参见官网,或者配套的示例代码。
9.1.3.7:Publisher Confirms 发送者消息确认
RabbitMQ的消息可靠性是非常高的,但是他以往的机制都是保证消息发送到了MQ之后,可以推送到消费者消费,不会丢失消息。但是发送者发送消息是否成功是没有保证的。我们可以回顾下,发送者发送消息的基础API:Producer.basicPublish方法是没有返回值的,也就是说,一次发送消息是否成功,应用是不知道的,这在业务上就容易造成消息丢失。而这个模块就是通过给发送者提供一些确认机制,来保证这个消息发送的过程是成功的。
如果了解了这个机制就会发现,这个消息确认机制就是跟RocketMQ的事务消息机制差不多的。而对于这个机制,RocketMQ的支持明显更优雅。
发送者确认模式默认是不开启的,所以如果需要开启发送者确认模式,需要手动在channel中进行声明。
channel.confirmSelect();
在官网的示例中,重点解释了三种策略:
1、发布单条消息
即发布一条消息就确认一条消息。核心代码:
for (int i = 0; i < MESSAGE_COUNT; i++) {
String body = String.valueOf(i);
channel.basicPublish("", queue, null, body.getBytes());
channel.waitForConfirmsOrDie(5_000);
}
channel.waitForConfirmsOrDie(5_000);这个方法就会在channel端等待RabbitMQ给出一个响应,用来表明这个消息已经正确发送到了RabbitMQ服务端。但是要注意,这个方法会同步阻塞channel,在等待确认期间,channel将不能再继续发送消息,也就是说会明显降低集群的发送速度即吞吐量。
官方说明了,其实channel底层是异步工作的,会将channel阻塞住,然后异步等待服务端发送一个确认消息,才解除阻塞。但是我们在使用时,可以把他当作一个同步工具来看待。
然后如果到了超时时间,还没有收到服务端的确认机制,那就会抛出异常。然后通常处理这个异常的方式是记录错误日志或者尝试重发消息,但是尝试重发时一定要注意不要使程序陷入死循环。
2、发送批量消息
之前单条确认的机制会对系统的吞吐量造成很大的影响,所以稍微中和一点的方式就是发送一批消息后,再一起确认。
核心代码:
int batchSize = 100;
int outstandingMessageCount = 0;
long start = System.nanoTime();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String body = String.valueOf(i);
ch.basicPublish("", queue, null, body.getBytes());
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
ch.waitForConfirmsOrDie(5_000);
outstandingMessageCount = 0;
}
}
if (outstandingMessageCount > 0) {
ch.waitForConfirmsOrDie(5_000);
}
这种方式可以稍微缓解下发送者确认模式对吞吐量的影响。但是也有个固有的问题就是,当确认出现异常时,发送者只能知道是这一批消息出问题了, 而无法确认具体是哪一条消息出了问题。所以接下来就需要增加一个机制能够具体对每一条发送出错的消息进行处理。
3、异步确认消息
实现的方式也比较简单,Producer在channel中注册监听器来对消息进行确认。核心代码就是一个:
channel.addConfirmListener(ConfirmCallback var1, ConfirmCallback var2);
按说监听只要注册一个就可以了,那为什么这里要注册两个呢?如果对照下RocketMQ的事务消息机制,这就很容易理解了。发送者在发送完消息后,就会执行第一个监听器callback1,然后等服务端发过来的反馈后,再执行第二个监听器callback2。
然后关于这个ConfirmCallback,这是个监听器接口,里面只有一个方法: void handle(long sequenceNumber, boolean multiple) throws IOException; 这方法中的两个参数,
sequenceNumer:这个是一个唯一的序列号,代表一个唯一的消息。在RabbitMQ中,他的消息体只是一个二进制数组,并不像RocketMQ一样有一个封装的对象,所以默认消息是没有序列号的。而RabbitMQ提供了一个方法int sequenceNumber = channel.getNextPublishSeqNo());来生成一个全局递增的序列号。然后应用程序需要自己来将这个序列号与消息对应起来。没错!是的!需要客户端自己去做对应!
multiple:这个是一个Boolean型的参数。如果是false,就表示这一次只确认了当前一条消息。如果是true,就表示RabbitMQ这一次确认了一批消息,在sequenceNumber之前的所有消息都已经确认完成了。
9.2.Springboot具体使用
SpringBoot官方就集成了RabbitMQ,所以RabbitMQ与SpringBoot的集成是非常简单的。不过,SpringBoot集成RabbitMQ的方式是按照Spring的一套统一的MQ模型创建的,因此SpringBoot集成插件中对于生产者、消息、消费者等重要的对象模型,与RabbitMQ原生的各个组件有对应关系,但是并不完全相同。这一点需要在后续试验过程中加深理解。
9.2.1.引入依赖
SpringBoot官方集成了RabbitMQ,只需要快速引入依赖包即可使用。RabbitMQ与SpringBoot集成的核心maven依赖就下面一个。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
要特别注意下版本。我们这里采用的是SpringBoot的2.6.7版本的依赖发布包。不同版本下的配置方式会有变化。
然后所有的基础运行环境都在application.properties中进行配置。所有配置以spring.rabbitmq开头。通常按照示例进行一些基础的必要配置就可以跑了。关于详细的配置信息,可以参见RabbitProperties,源码中有各个字段说明。
如果需要更详细的配置资料,那就需要到官方的github仓库上去查了。但是国内访问github的这个速度,你是知道的。
9.2.2.配置生产者
基础的运行环境参数以及生产者的一些默认属性配置都集中到了application.properties配置文件中。所有配置项都以spring.rabbitmq开头。
关于详细的配置信息,可以参见RabbitProperties类的源码,源码中有各个字段的简单说明。
如果需要更详细的配置资料,那就需要去官方的github仓库上去查了。github地址: https://github.com/spring-projects/spring-amqp 。但是国内访问github的速度,你知道的。
9.2.3.声明队列
所有的exchange, queue, binding的配置,都需要以对象的方式声明。默认情况下,这些业务对象一经声明,应用就会自动到RabbitMQ上常见对应的业务对象。但是也是可以配置成绑定已有业务对象的。
9.2.4.使用RabbitmqTemplate对象发送消息
生产者的所有属性都已经在application.properties配置文件中进行配置。项目启动时,就会在Spring容器中初始化一个RabbitmqTemplate对象,然后所有的发送消息操作都通过这个对象来进行。
9.2.5.使用@RabbitListener注解声明消费者
消费者都是通过@RabbitListener注解来声明。注解中包含了声明消费者队列时所需要的重点参数。对照原生API,这些参数就不难理解了。
但是当要消费Stream队列时,还是要重点注意他的三个必要的步骤:
channel必须设置basicQos属性。 channel对象可以在@RabbitListener声明的消费者方法中直接引用,Spring框架会进行注入。
正确声明Stream队列。 通过往Spring容器中注入Queue对象的方式声明队列。在Queue对象中传入声明Stream队列所需要的参数。
消费时需要指定offset。 可以通过注入Channel对象,使用原生API传入offset属性。
使用SpringBoot框架集成RabbitMQ后,开发过程可以得到很大的简化,所以使用过程并不难,对照一下示例就能很快上手。但是,需要理解一下的是,SpringBoot集成后的RabbitMQ中的很多概念,虽然都能跟原生API对应上,但是这些模型中间都是做了转换的,比如Message,就不是原生RabbitMQ中的消息了。使用SpringBoot框架,尤其需要加深对RabbitMQ原生API的理解,这样才能以不变应万变,深入理解各种看起来简单,但是其实坑很多的各种对象声明方式。
9.2.6:关于Stream队列
在目前版本下,使用RabbitMQ的SpringBoot框架集成,可以正常声明Stream队列,往Stream队列发送消息,但是无法直接消费Stream队列了。
关于这个问题,还是需要从Stream队列的三个重点操作入手。SpringBoot框架集成RabbitMQ后,为了简化编程模型,就把channel,connection等这些关键对象给隐藏了,目前框架下,无法直接接入这些对象的注入过程,所以无法直接使用。
如果非要使用Stream队列,那么有两种方式,一种是使用原生API的方式,在SpringBoot框架下自行封装。另一种是使用RabbitMQ的Stream 插件。在服务端通过Strem插件打开TCP连接接口,并配合单独提供的Stream客户端使用。这种方式对应用端的影响太重了,并且并没有提供与SpringBoot框架的集成,还需要自行完善,因此选择使用的企业还比较少。
这里就不详细介绍使用方式了。关于Stream插件的使用和配置方式参见官方文档:https://www.rabbitmq.com/stream.html。配合Stream插件使用的客户端有Java和GO两个版本。其中Java版本客户端参见git仓库:https://github.com/rabbitmq/rabbitmq-stream-java-client 。
其实关于Stream队列,现在也不需要着急上手,只是把他当作一种特殊的队列类型,上手了解即可。因为一方面太新的技术,往往还需要小白鼠多多验证。另一方面,现在RabbitMQ多种队列并存的状态,在不久肯定会得到简化,到时候,应用层的使用方式也肯定会跟着变化。
写在最后
如果您觉得这些文章对您有所启发和帮助,何不将它们与您的好友分享呢?这样,他们也能够享受其中的精彩内容,并从中获得启发。谢谢您的支持与分享!~
同时也希望您用发财的手帮忙点个关注,可以通过下方菜单点击福利领取上千套简历模板、几千道的面试题pdf以及几百G涵盖了Java开发,前端开发,小程序开发,数据库,测试等等的相关学习书籍与资料。

另外也可以通过点击交流群按钮添加我好友,然后拉您到自己的创建的Java知识分享群。一起去讨论、学习、成长、进步,谢谢~




