
目录
- 一、概述
- 二、RabbitMQ构造图
- 三、JAVA操作RabbitMQ
- 3.1、RabbitMQ通信方式
- 3.2、Java操作RabbitMQ
- 3.3、Java实现RabbitMQ通信方式
- 3.3.1、方式一:"Hello World!"
- 3.3.2、方式二:Work queues
- 3.3.3、方式三:Publish/Subscribe
- 3.3.4、方式四:Routing
- 3.3.5、方式五:Topics
- 3.4、延时队列
- 四、RabbitMQ的问题解决
- 五、RabbitMQ整合SpringBoot
- 六、RabbitMQ集群
- 七、应用场景及解决方案
一、概述
1.1、概述
概述
RabbitMQ 相对后来者 Kafka,功能更加单一,是一款典型的“只做好一件事”的软件。
其核心抽象是一个队列 (Queue) ,用户通过 RabbiMQ 定义一组队列用于将内容在不同应用之间传递。
几款MQ对比:ActiveMQ、RocketMQ、Kafka、RabbitMQ
- 支持语言
- ActiveMQ、RocketMQ仅支持Java
- Kafka、RabbitMQ支持多种语言
- 效率
- ActiveMQ、RocketMQ、Kafka是毫秒级别
- RabbitMQ是微秒级别
- 学习成本
- RabbitMQ较为简单
1.2、Docker安装RabbitMQ
-
查看镜像
docker search rabbitmq:management -
拉取镜像(拉取带management的原因是这种带网页端图形化管理界面)
docker pull rabbitmq:management -
启动容器
-
命令方式启动
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management注意:这里两个-p,分别代表 -p 服务器端口号:容器端口号 -p 服务器端口号:容器端口号
- 5672用于提供Rabbitmq服务
- 15672用于提供网页端图形化管理界面访问
-
docker-compose.yml方式启动
version: "3.1" services: rabbitmq: image: daocloud.io/library/rabbitmq:management restart: always container_name: rabbitmq ports: - 5672:5672 - 15672:15672 volumes: # 由于rabbitmq可以对消息进行持久化 - ./data:/var/lib/rabbitmq# 使用docker-compose启动 docker-compose up -d
-
-
查看是否启动
docker ps
1.3、RabbitMQ图形管理界面使用
Rabbit可视化界面基本使用参看:python操作消息中间件RabbitMQ
二、RabbitMQ构造图
一个RabbitMQ可以有多个Virtual Host
一个Virtual Host可以有多个Exchange
一个Exchange多个Queue

三、JAVA操作RabbitMQ
3.1、RabbitMQ通信方式
3.2、Java操作RabbitMQ
导入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
创建连接
public class RabbitMQClient {
public static Connection getConection(){
// 创建Connection工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("test");
factory.setPassword("test");
factory.setVirtualHost("/test");
// 创建Connection
Connection conn = null;
try {
conn = factory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
// 返回连接
return conn;
}
}
测试连接
public void getConnection() throws IOException {
Connection connection = RabbitMQClient.getConection();
connection.close();
}
3.3、Java实现RabbitMQ通信方式
3.3.1、方式一:“Hello World!”

特点
一个生产者、一个默认的交换机、一个队列、一个消费者
一个队列一个消费者
代码实现
-
生产者
public class Publisher { @Test public void publish() throws Exception { // 1、获取连接对象 Connection connection = RabbitMQClient.getConection(); // 2、创建Channel Channel channel = connection.createChannel(); // 3、发布消息到exchange,同时指定路由的规则 String msg = "Hello-World-msg!"; // 参数1:指定exchange, 默认的使用"" // 参数2:指定路由规则(routingKey),使用具体的队列名称 // 参数3:指定传递消息所携带的properties(属性) // 参数4:指定发布消息的具体内容,byte[]类型 channel.basicPublish("", "Hello World", null, msg.getBytes()); System.out.println("生产者发布消息成功!"); // 4、释放资源 channel.close(); connection.close(); } } -
消费者
public class Consumer { @Test public void consume() throws Exception { // 1、获取连接对象 Connection connection = RabbitMQClient.getConection(); // 2、创建Channel Channel channel = connection.createChannel(); // 3、声明队列(一定要与生产者指定的队列相同) // 参数1:queue-指定消费队列的名称 // 参数2:durable-当前队列是否需要持久化(true) // 参数3:exclusive-是否排外(效果:1、conn.close-当前队列将会被消除;2、当前队列只能被一个消费者消费) // 参数4:autoDelete-如果这个队列没有消费者消费,则自动删除 // 参数5:arguments-指定当前队列的其他信息(比如最大存多少) channel.queueDeclare("Hello World", true, false, false, null); // 4、开启监听队列并消费 // 创建消费者,并重写handleDelivery方法,对消息进行处理 DefaultConsumer consume = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到消息:" + new String(body, "UTF-8")); } }; // 参数1:queue-指定消费队列的名称 // 参数2:autoAck-指定是否自动ACK(true:接到消息后,会立即告诉RabbitMQ消费完毕,false:需要手动) // 参数3:consumer-指定消费回调 channel.basicConsume("Hello World", true, consume); // 消费掉 System.in.read(); // 【保证不停止运行,一直监听】 // 5、释放资源 channel.close(); connection.close(); } }
3.3.2、方式二:Work queues

特点
一个发布者、一个默认的交换机、一个队列、两个消费者
一个队列两个消费者
原理
【问题】:RabbitMQ默认是平均分配的,但是由于不同消费者能力不同,如果平均分配会导致有的很闲有的压力很大
【解决】:1、指定消费能力basicQos。2、手动ACK
代码实现
-
生产者
channel.basicPublish("", "Work", null, msg.getBytes()); //往Work队列中发送消息 -
消费者
-
消费者1
public class Consumer { @Test public void consume() throws Exception { // 1、获取连接对象 Connection connection = RabbitMQClient.getConection(); // 2、创建Channel Channel channel = connection.createChannel(); // 3、声明队列(一定要与生产者指定的队列相同) channel.queueDeclare("Work", true, false, false, null); // 【改动1】:指定消费能力 channel.basicQos(1); // 4、开启监听队列并消费 DefaultConsumer consume = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1号,接收到消息:" + new String(body, "UTF-8")); // 【改动2】手动ack channel.basicAck(envelope.getDeliveryTag(), false); } }; //【改动2】:false(换为手动ACK) channel.basicConsume("Work", false, consume); // 消费掉 System.in.read(); // 【保证不停止运行,一直监听】 // 5、释放资源 channel.close(); connection.close(); } } -
消费者2
channel.basicQos(2); // 【改动1】:指定消费能力 channel.basicAck(envelope.getDeliveryTag(), false); // 【改动2】手动ack channel.basicConsume("Work", false, consume); //【改动2】:false(换为手动ACK)
-
3.3.3、方式三:Publish/Subscribe

特点
一个生产者,一个自己创建的交换机,两个队列,两个消费者
两个队列两个消费者,每个队列中发一份相同的
原理
-
生产者:
- 创建一个
FANOUT类型的exchange - 并把多个队列绑定到交换机上(exchange)
- 发送消息指定上交换机
- 创建一个
-
消费者:指定要监听的队列
代码实现
-
生产者
public class Publisher { @Test public void publish() throws Exception { // 1、获取连接对象 Connection connection = RabbitMQClient.getConection(); // 2、创建Channel Channel channel = connection.createChannel(); // 【改动1】指定交换机「交换机名, 交换机类型, 是否持久化」 channel.exhangeDeclare("PS-exchange", BuiltExchangeType.FANOUT, true) // 【改动2】绑定指定队列「队列,交换机,绑定形式」 channel.queueBind("ps-queue1", "PS-exchange", ""); channel.queueBind("ps-queue2", "PS-exchange", ""); // 3、发布消息到exchange,同时指定路由的规则 String msg = "Publish/Subscribe-msg"; //【改动3】:填写指定上交换机,队列置为空(不指定队列) channel.basicPublish("PS-exchange", "", null, msg.getBytes()); System.out.println("生产者发布消息成功!"); // 4、释放资源 channel.close(); connection.close(); } } -
消费者
-
消费者1
public class Consumer { @Test public void consume() throws Exception { // 1、获取连接对象 Connection connection = RabbitMQClient.getConection(); // 2、创建Channel Channel channel = connection.createChannel(); // 3、声明队列(一定要与生产者指定的队列相同) // 【改动1】:指定监听的队列ps-queue1 channel.queueDeclare("ps-queue1", true, false, false, null); // 4、开启监听队列并消费 DefaultConsumer consume = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到消息:" + new String(body, "UTF-8")); } }; // 【改动2】:指定消费的队列ps-queue1 channel.basicConsume("ps-queue1", true, consume); // 消费掉 System.in.read(); // 【保证不停止运行,一直监听】 // 5、释放资源 channel.close(); connection.close(); } } -
消费者2
// 【改动1】:指定监听的队列ps-queue2 channel.queueDeclare("ps-queue1", true, false, false, null); // 【改动2】:指定消费的队列ps-queue2 channel.basicConsume("ps-queue1", true, consume); // 消费掉
-
3.3.4、方式四:Routing

特点
一个生产者,一个自己创建的交换机,两个队列,两个消费者
两个队列两个消费者,每个队列中可指定(精确指定)发不同的消息
原理
- 生产者:
- 创建一个
DIRECT类型的exchange - 把多个队列绑定到交换机上(exchange)且指定不同的发送规则(精确匹配)
- 发送消息绑定上交换机,并设定不同的发送规则
- 创建一个
- 消费者:指定要监听的队列
代码实现
-
生产者
public class Publisher { @Test public void publish() throws Exception { // 1、获取连接对象 Connection connection = RabbitMQClient.getConection(); // 2、创建Channel Channel channel = connection.createChannel(); //【改动1】:创建exchange channel.exhangeDeclare("routing-exchange", BuiltExchangeType.DIRECT) // 【改动2】绑定指定队列「队列,交换机,发送规则」 channel.queueBind("r-queue_error", "routing-exchange", "ERROR"); channel.queueBind("r-queue_info", "routing-exchange", "INFO"); // 3、发布消息到exchange,同时指定路由的规则 String msg = "Hello-World-msg!"; // 【改动3】设定发送规则 channel.basicPublish("routing-exchange", "ERROR", null, msg.getBytes()); channel.basicPublish("routing-exchange", "INFO", null, msg.getBytes()); System.out.println("生产者发布消息成功!"); // 4、释放资源 channel.close(); connection.close(); } } -
消费者
-
消费者1
public class Consumer { @Test public void consume() throws Exception { // 1、获取连接对象 Connection connection = RabbitMQClient.getConection(); // 2、创建Channel Channel channel = connection.createChannel(); // 3、声明队列(一定要与生产者指定的队列相同) // 【改动1】:指定监听的队列ps-queue1 channel.queueDeclare("r-queue_error", true, false, false, null); // 4、开启监听队列并消费 DefaultConsumer consume = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到消息:" + new String(body, "UTF-8")); } }; // 【改动2】:指定消费的队列ps-queue1 channel.basicConsume("r-queue_error", true, consume); // 消费掉 System.in.read(); // 【保证不停止运行,一直监听】 // 5、释放资源 channel.close(); connection.close(); } } -
消费者2
// 【改动1】:指定监听的队列r-queue_info channel.queueDeclare("r-queue_info", true, false, false, null); // 【改动2】:指定消费的队列r-queue_info channel.basicConsume("r-queue_info", true, consume); // 消费掉
-
3.3.5、方式五:Topics

特点
一个生产者,一个自己创建的交换机,两个队列,两个消费者
两个队列两个消费者,每个队列中可指定(模糊指定)发不同的消息
原理
- 生产者:
- 创建一个
TOPIC类型的exchange - 把多个队列绑定到交换机上(exchange)且指定不同的发送规则(模糊匹配)
- 发送消息绑定上交换机,并设定不同的发送规则
- 创建一个
- 消费者:指定要监听的队列
代码实现
-
生产者
public class Publisher { @Test public void publish() throws Exception { // 1、获取连接对象 Connection connection = RabbitMQClient.getConection(); // 2、创建Channel Channel channel = connection.createChannel(); //【改动1】:创建exchange channel.exhangeDeclare("topic-exchange", BuiltExchangeType.TOPIC) // 【改动2】绑定指定队列「队列,交换机,发送规则」*:1个xxx #:xxx.xxx.xxx.... channel.queueBind("topic-queue_color", "topic-exchange", "color.*"); channel.queueBind("topic-queue_speed", "topic-exchange", "speed.#"); // 3、发布消息到exchange,同时指定路由的规则 String msg = "Hello-World-msg!"; // 【改动3】设定发送规则 channel.basicPublish("routing-exchange", "color.red", null, msg.getBytes()); channel.basicPublish("routing-exchange", "speed.fast.red", null, msg.getBytes()); System.out.println("生产者发布消息成功!"); // 4、释放资源 channel.close(); connection.close(); } } -
消费者
-
消费者1
public class Consumer { @Test public void consume() throws Exception { // 1、获取连接对象 Connection connection = RabbitMQClient.getConection(); // 2、创建Channel Channel channel = connection.createChannel(); // 3、声明队列(一定要与生产者指定的队列相同) // 【改动1】:指定监听的队列ps-queue1 channel.queueDeclare("r-queue_color", true, false, false, null); // 4、开启监听队列并消费 DefaultConsumer consume = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到消息:" + new String(body, "UTF-8")); } }; // 【改动2】:指定消费的队列ps-queue1 channel.basicConsume("r-queue_color", true, consume); // 消费掉 System.in.read(); // 【保证不停止运行,一直监听】 // 5、释放资源 channel.close(); connection.close(); } } -
消费者2
// 【改动1】:指定监听的队列r-queue_info channel.queueDeclare("r-queue_speed", true, false, false, null); // 【改动2】:指定消费的队列r-queue_info channel.basicConsume("r-queue_speed", true, consume); // 消费掉
-
3.4、延时队列
3.4.1、TTL
概述
【全称】:Time To Live(存活时间/过期时间)
【特点】1、当消息到达存活时间后,还没有被消费,会被自动清除
2、RabbitMQ可以对消息设置过期时间,也可以为整个队列设置过期时间
3、消息过期后,只有在队列顶端,才会判断其是否过期(避免轮询判断消耗性能)
代码实现
-
队列的统一过期
// 2、创建Channel Channel channel = connection.createChannel(); // 设置队列过期时间 Map<String, Object> args = new HashMap<String, Object>(); args.put("x-message-ttl", 60000); // 创建队列:queueName, durable, exclusive, autoDelete, args channel.queueDeclare("publish-queue1", true, false, false, args); -
消息的过期
// 3、发布消息到exchange,同时指定路由的规则 String msg = "Hello-World!"; // 设置过期时间 AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.expiration("5"); AMQP.BasicProperties properties = builder.build(); channel.basicPublish("", "Hello World", properties, msg.getBytes());
3.4.2、死信队列
概述
【全称】:DLX , Dead Letter Exchange(私信交换机)
【概述】:当消息成为Dead Message 后,可以被重新发送到另一个交换机上,这个交换机就是DLX
【消息成为内死信的三种情况】
1、队列消息长度达限制
2、消费者拒接消息(basicNack/basicReject),并不把消息重新放回原队列(requeue=false)
3、消息超时未被消费
代码实现
// 1、创建普通Exchange
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
// 2、创建队列,绑定到1上和3上
Map<String, Object> agruments = new HashMap<String, Object>();
agruments.put("x-dead-letter-exchange", routingKey1);// routingKey1:指定2-->3的规则
channel.queueDeclare(queueName, true, false, false, agruments);
channel.queueBind(queueName, exchangeName, routingKey2); // routingKey2:指定1-->2的规则
// 3、创建死信Exchange,绑定到2上
channel.exchangeDeclare(exchangeName2, "topic", true, false, null);
// 4、创建普通队列,绑定到3上
channel.queueDeclare(queueName2, true, false, false, null);
channel.queueBind(queueName2, exchangeName2, routingKe3);// routingKey3:指定3-->4的规则
3.4.3、延时队列
概述
【概述】:消息进入队列中并不会被消费,之后到达指定时间后,才会被消费。
【应用场景】:1、下单后,30分钟未支付,取消订单
2、新注册用户成功7天后,发送短信问候
【解决方案对比】:1、定时器:轮询查询数据库检测是否支付,十分消耗性能,不优雅
2、延时队列:只需查询一次数据库,优雅
【延时队列原理】:使用TTL + 死信队列组合实现延时队列

代码实现
// 【DLX 创建死信队列】
// 1、创建普通Exchange
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
// 2、创建队列,绑定到1上和3上
Map<String, Object> agruments = new HashMap<String, Object>();
agruments.put("x-dead-letter-exchange", routingKey1);// routingKey1:指定2-->3的规则
channel.queueDeclare(queueName, true, false, false, agruments);
channel.queueBind(queueName, exchangeName, routingKey2); // routingKey2:指定1-->2的规则
// 3、创建死信Exchange,绑定到2上
channel.exchangeDeclare(exchangeName2, "topic", true, false, null);
// 4、创建普通队列,绑定到3上
channel.queueDeclare(queueName2, true, false, false, null);
channel.queueBind(queueName2, exchangeName2, routingKe3);// routingKey3:指定3-->4的规则
// 【TTL 发消息设置过期时间】
String msg = "Hello-World!";
// 设置过期时间
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("1800000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName2, "Hello World", properties, msg.getBytes());
四、RabbitMQ的问题解决
4.1、防止消息丢失(消息可靠性)
4.1.1、丢失场景
可能有如下4种场景会导致消息丢失
- 【场景1】:生产者发送消息时,网络等问题,导致未发送到MQ的Exchange
- 【场景2】:Exchange到Queue过程中MQ突然宕机了
- 【场景3】:消息已经到达MQ中的Queue,但是MQ突然宕机了
- 【场景4】:消费者消费消息时,取出了消息,还未处理完对应的业务,消费者宕机了

4.1.2、解决【场景1】
【场景1】:生产者发送消息时,网络等问题,导致未发送到MQ的Exchange
【解决方法】:使用事务/Comfirm
事务操作
【功能】:事务可以保证消息100%传递,可以通过事物的回滚去记录到日志中,后面定时再次发送当前消息。
【弊端】:事务效率太低,比平时操作效率低上100倍以上。
Comfirm方式
【功能】:和RabbitMQ事务功能一样,并且效率高很多
-
普通Comfirm方式
// 3、发布消息到exchange,同时指定路由的规则 // 3.1 开启confirm channel.confirmSelect(); // 3.2 发送消息 String msg = "Hello-World!"; channel.basicPublish("", "Hello World", null, msg.getBytes()); // 3.3 判断是否发送成功 if(channel.waitForConfirms()){ System.out.println("发送成功"); }else{ System.out.println("发送失败, 执行写入日志文件/数据库等业务操作"); } -
批量Comfirm方式
// 3、发布消息到exchange,同时指定路由的规则 // 3.1 开启confirm channel.confirmSelect(); // 3.2 发送消息 for (int i=0; i<1000; i++){ String msg = "Hello-World!" + i; channel.basicPublish("", "Hello World", null, msg.getBytes()); } // 3.3 判断是否全部成功:当发送的消息有1个失败,则全部失败,抛出异常IOException channel.waitForConfirmsOrDie(); -
异步Comfirm方式
发送消息和发送结果返回是异步操作的,所以,发送结果返回不影响消息的继续发送
// 3、发布消息到exchange,同时指定路由的规则 // 3.1 开启confirm channel.confirmSelect(); // 3.2 发送消息 for (int i=0; i<1000; i++){ String msg = "Hello-World!" + i; channel.basicPublish("", "Hello World", null, msg.getBytes()); } // 3.3 异步发送 channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long l, boolean b) throws IOException { System.out.println("消息发送成功,标识:" + l + "是否是批量:" + b); } @Override public void handleNack(long l, boolean b) throws IOException { System.out.println("消息发送失败,标识:" + l + "是否是批量:" + b); } }); // 由于是异步,方便看效果 // System.in.read();
4.1.3、解决【场景2】
【场景2】:Exchange到Queue过程中MQ突然宕机了
【解决方法】:使用return机制
return机制
// 2、创建Channel
Channel channel = connection.createChannel();
// 开启return机制
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
// 当消息没有送达队列中才会执行
System.out.println(new String(bytes, "UTF-8") + "没有送到队列中");
}
});
// 3、发布消息到exchange,同时指定路由的规则
// 3.1 开启confirm
channel.confirmSelect();
// 3.2 发送消息
for (int i=0; i<1000; i++){
String msg = "Hello-World!" + i;
// 【这里要设置为true才行(默认是false)】
channel.basicPublish("", "Hello World", true, null, msg.getBytes());
}
4.1.4、解决【场景3】
【场景3】:消息已经到达MQ中的Queue,但是MQ突然宕机了
【解决方法】:持久化(RabbitMQ提供了持久化的功能)
持久化操作
-
设置交换器的持久化
// 三个参数分别为 交换器名、交换器类型、是否持久化 channel.exchangeDeclare(EXCHANGE_NAME, "topic", true); -
设置队列的持久化
// 参数1 queue :队列名 // 参数2 durable :是否持久化 // 参数3 exclusive :仅创建者可以使用的私有队列,断开后自动删除 // 参数4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列 // 参数5 arguments channel.queueDeclare(QUEUE_NAME, true, false, false, null); -
设置消息的持久化
// 参数1 exchange :交换器 // 参数2 routingKey : 路由键 // 参数3 props : 消息的其他参数,其中 MessageProperties.PERSISTENT_TEXT_PLAIN 表示持久化 // 参数4 body : 消息体 channel.basicPublish("", queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
4.1.5、解决【场景4】
【场景4】:消费者消费消息时,取出了消息,还未处理完对应的业务,消费者宕机了
【解决方法】:设置手动ACK即可(取出消息后等到业务理完毕,在手动执行ACK)
手动ACK
// 4、开启监听队列并消费
DefaultConsumer consume = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1号,接收到消息:" + new String(body, "UTF-8"));
// 【改动2】手动ACK
try{
// 手动ACK
channel.basicAck(envelope.getDeliveryTag(), false);
}catch (Exception e) {
//重新放入队列
//channel.basicNack(envelope.getDeliveryTag(), false, true);
//抛弃此条消息
//channel.basicNack(envelope.getDeliveryTag(), false, false);
e.printStackTrace();
}
}
};
//【改动2】:false(换为手动ACK)
channel.basicConsume("Work", false, consume); // 消费掉
4.2、防止重复消费(保障幂等性)
原因
消费者1在消费此消息,还没给rabbitmq一个ack,该消息就又被消费者2拿到进行消费
解决原理
- 1、生产者发送消息时,生成惟一的msgId,与消息一起发送
- 2、消费者1拿到消息,把msgId:0(0代表还未消费完),以setnx+setex的形式放入redis
- 3、消费者1消费完消息,把msgId:1(1代表消费完毕),以setex的形式放入redis,并手动ack
- 4、消费者2拿到同样的消息,把msgId:0以setnx+setex的形式放入redis,会失败
- 5、消费者2 get(msgId),进行判断:如果是0,说明1在消费,直接返回;如果是1,说明1已经消费完,做手动ack。

代码实现
-
生产者
发送消息时,指定消息id
// 3.2 发送消息 // 添加唯一标识 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .deliveryMode(1) // 指定是否持久化到消息队列中(1:是;2:否) .messageId(UUID.randomUUID().toString()).build(); String msg = "Hello-World!"; // 存入messageId channel.basicPublish("", "Hello World", true, properties, msg.getBytes()); -
消费者
消费消息时,根据逻辑操作redis,用于ack
// 4、开启监听队列 // 创建消费者,并重写handleDelivery方法,对消息进行处理 DefaultConsumer consume = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { Jedis jedis = new Jedis("localhost", 6379); String messageId = properties.getMessageId(); // 获得messageId // 1、setnx到redis中,默认指定value为0, (NX:setnx EX:setex)过期时间为10s String result = jedis.set(messageId, "0", "NX", "EX", 10); if(result != null && result.equalsIgnoreCase("OK")){ System.out.println("接收到消息" + new String(body, "UTF-8")); // 2、消费成功,set messageId 1 jedis.set(messageId, "1", "EX", 10); // 存在该key直接覆盖 channel.basicAck(envelope.getDeliveryTag(), false); }else{ // 3、如过1中setnx失败,获取key对应的value,0:啥也不做,如果是1:指定ack回调 String s = jedis.get(messageId); if("1".equalsIgnoreCase(s)){ channel.basicAck(envelope.getDeliveryTag(), false); } } } };
4.3、防止消息乱序
4.4、解决消息积压
TTL、死信队列、延迟队列、
五、RabbitMQ整合SpringBoot
5.1、快速实现
导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
编写配置文件
spring:
rabbitmq:
host: localhost
port: 5672
username: test
password: test
virtual-host: /test
# 下面根据具体需求再配置
listener:
simple:
acknowledge-mode: manual # 手动ACK
concurrency:1 # 最小消费者数量
max-concurrency:10 # 最大消费者数量
prefetch: 1 # unack的最大数量(限流)
publisher-confirm-type: simple # confirm(防丢失)
publisher-returns: true # return(防丢失)
编写RabbitMQ的配置类:方便统一管理
public class RabbitMQConfig{
// 1、创建exchange-topic
@Bean
public TopicExchange getTopicExchange(){
// Exchange名、持久化、不自动删除
return new TopicExchange("boot-topic-exchange", true, false);
}
// 2、创建队列
@Bean
public Queue getQueue(){
// 队列名、持久化、排外、不自动删除、其他
return new Queue("boot-queue", true, false, false, null);
}
// 3、绑定exchange和队列、指定发送规则
@Bean
public Binding getBinding(TopicExchange topicExchange, Queue queue){
return BindingBuilder.bind(queue).to(topicExchange).with("*.red.*");
}
}
生产者
@Autowired
private RabbitTemplate rabbitTemplate;
// 发送消息
rabbitTemplate.convertAndSend("boot-topic-exchange", "slow.red.rabbit", "慢红兔");
消费者
@RabbitListener(queues = "boot-queue")
public void getMassage(Object massage){
// 可以获得RoutingKey(*.red.*)
String receivedRoutingKey = message.getMessageProperties().getReceivedRoutingKey()
System.out.println("接收到队列" + massage);
}
5.2、消息丢失:Comfirm&Return
编写配置文件
spring:
rabbitmq:
host: localhost
port: 5672
username: test
password: test
virtual-host: /test
listener:
simple:
acknowledge-mode: manual
publisher-confirm-type: simple
publisher-returns: true
加个PublisherConfirmAndReturnConfig
Comfirm:生产者发送消息时,网络等问题,导致未发送到MQ的Exchange
Return:Exchange到Queue过程中MQ突然宕机了
@Component
public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct // 创建对象时,会执行该方法
public void initMethod(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
if (b){
System.out.println("消息已经送达exchange");
}else{
System.out.println("消息未送达到exchange");
}
}
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println("消息未送达到queue中");
}
}
5.3、消息丢失:持久化
只需要定义 exchange 和 queue 的持久化。
持久化操作
-
直接在配置类中指定即可
public class RabbitMQConfig{ // 1、创建exchange-topic @Bean public TopicExchange getTopicExchange(){ // Exchange名、【持久化】、不自动删除 return new TopicExchange("boot-topic-exchange", true, false); } // 2、创建队列 @Bean public Queue getQueue(){ // 队列名、【持久化】、排外、不自动删除、其他 return new Queue("boot-queue", true, false, false, null); } // 3、绑定exchange和队列、指定发送规则 @Bean public Binding getBinding(TopicExchange topicExchange, Queue queue){ return BindingBuilder.bind(queue).to(topicExchange).with("*.red.*"); } }
5.4、消息丢失:手动ACK
默认是消费完自动ACK(确认消费),手动ACK是消费完后不确认消费,在合适位置设定确认消费
编写配置文件
spring:
rabbitmq:
host: localhost
port: 5672
username: test
password: test
virtual-host: /test
listener:
simple:
acknowledge-mode: manual
在消费消息位置,设置手动ack
@RabbitListener(queues = "boot-queue")
public void getMassage(String msg, Channel channel, Message message) throws IOException {
System.out.println("接收到队列" + msg);
// 手动ack
try{
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e){
//重新放入队列
//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
//抛弃此条消息
//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
e.printStackTrace();
}
}
5.5、消息重复
-
生产者
发送消息时,指定消息id
// 发送消息 CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("boot-topic-exchange", "slow.red.rabbit", "慢红兔", messageId); -
消费者
消费消息时,根据逻辑操作redis,用于ack
@Component public class Consumer { @Autowired private StringRedisTemplate redisTemplate; @RabbitListener(queues = "boot-queue") public void getMassage(String msg, Channel channel, Message massage){ // 0、获取messageId String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation"); // 1、设置key到redis(如果不存(未被消费)在则进入2,存在(已被消费)进入5) if (redisTemplate.opsForValue().setIfAbsent(messageId, "0", 10, TimeUnit.SECONDS)){ // 2、消费消息 System.out.println("接收到消息:" + msg); // 3、设置key的值为为1(覆盖设置) redisTemplate.opsForValue().set(messageId, "1", 10, TimeUnit.SECONDS); // 4、手动ACK channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }else { // 5、获取key的值,为1说明已经消费完毕,但是没有ACK,直接手动ACK即可;为0说明在消费,啥也不用做 if (StringUtils.equals("1", redisTemplate.opsForValue().get(messageId))){ // 6、手动ACK channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } } }
5.6、TTL、DLX、延时队列
TTL
-
队列的统一过期
// 2、创建队列 @Bean public Queue getQueue(){ // 设置队列过期时间 Map<String, Object> args = new HashMap<String, Object>(); args.put("x-message-ttl", 60000); // 队列名、持久化、排外、不自动删除、其他 return new Queue("boot-queue", true, false, false, args); } -
消息过期
// 3、发布消息到exchange,同时指定路由的规则 String msg = "慢红兔"; // 设置过期时间 MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("5000"); return message; } }; // 发送消息 rabbitTemplate.convertAndSend("boot-topic-exchange", "slow.red.rabbit", msg, messagePostProcessor);
DLX
@Configuration
public class RabbitMQConfig {
// 1、创建创建普通Exchange
@Bean
public TopicExchange getTopicExchange(){
// Exchange名、持久化、不自动删除
return new TopicExchange(exchangeName, true, false);
}
// 2、创建中间队列,并绑定到死信交换机上(2-->3)
@Bean
public Queue getQueue(){
Map<String, Object> agruments = new HashMap<String, Object>();
agruments.put("x-dead-letter-exchange", routingKey1);
// 队列名、持久化、排外、不自动删除、其他
return new Queue(queueName, true, false, false, agruments);// 创建并绑定死信交换机
}
// 3、创建普通队列
@Bean
public Queue getQueue2(){
// 队列名、持久化、排外、不自动删除、其他
return new Queue(queueName2, true, false, false, null);
}
// 4、绑定中间队列到普通exchange(1--->2)
@Bean
public Binding getBinding(TopicExchange topicExchange, Queue queue){
return BindingBuilder.bind(queue).to(topicExchange).with(routingKey2);
}
// 5、绑定普通队列到死信交换机(3--->4)
@Bean
public Binding getBinding2(TopicExchange topicExchange, Queue queue){
return BindingBuilder.bind(queue).to(topicExchange).with(routingKey3);
}
}
延时队列
-
创建死信队列(DLX)
// 如DLX的 "RabbitMQConfig" -
生产者发送消息指定过期时间(TTL)
// 如TTL的 "消息过期"
六、RabbitMQ集群
没有集群搭建的实践,故此处仅做架构描述
6.1、集群架构
主备模式:主节点挂了,子节点依旧可以提供服务
- 主备节点共享数据
- 主节点挂掉,备份节点变为主节点,挂掉的主节点恢复后将变为备份节点
远程模式:跨地域的两个mq集群互联,可以实现双活
- 当上游集群挂掉/压力过大,可以转接到下游集群
- 配置复杂,用的不多
镜像模式:数据同步,保证数据100%不丢失
-
较为常用的一种模式
-
高可靠(某个节点挂掉,其他节点依旧可以提供服务)
-
不利于横向扩容(每个节点数据一致,横向扩容没有意义,反而增加数据同步的负担),建议奇数个节点(3个为最小奇数集群节点)
多活模式:异地多活+数据共享,解决镜像模式无法横向扩容的缺点
- 依赖RabbitMQ的federation插件,可以实现持续可靠的AMQP数据通信
- 不需要写复杂配置文件,可不停机配置
- 在多套数据中心中各部署一套RabbitMQ集群(异地多活),各中心的RabbitMQ除了提供正常业务服务外,还会实现部分队列消息共享(数据共享)。
七、应用场景及解决方案
6.1、应用解耦&异步提速
【不使用MQ】
所有应用同步操作,一方面比较耗时,另一方面,一旦其中一个应用挂掉,所有应用则无法继续使用

【使用MQ】
应用解耦,所有应用异步操作,减少时耗、避免一个应用挂掉导致所有应用不能使用

6.2、通知(消息发布订阅)
在使用websocket实现跨服务器聊天时,可把Redis换为MQ,使用Hello World的通信方式。


6.3、削峰填谷
当 上下游系统 处理能力存在差距的时候,利用 MQ 做一个 “漏斗” 模型,进行 流控。把 MQ 当成可靠的 消息暂存地,进行一定程度的 消息堆积;在下游有能力处理的时候,再发送消息。
MQ 的流量削峰常用于高并发场景(例如:秒杀、团抢等业务场景),它是缓解瞬时暴增流量的核心手段之一。
【解决方案】: 消费端限流
1、添加rabbitmq,并设置手动ACK方式
2、适当添加Thread.sleep(100);
3、配置文件设置spring.rabbitmq.listener.simple.prefetch=1 //unack的最大数量
6.4、数据分发
使用Routing或Topic的通信方式进行数据分发


6.5、延时任务
我们经常会遇到如下场景
1、定时器:轮询查询数据库检测是否支付,十分消耗性能,不优雅
2、延时队列:只需查询一次数据库,优雅
如果使用定时器去做,将会不断地轮询查询数据库,性能较差,十分不优雅,而RabbitMQ可以使用
TTL+DLX实现延时队列,完成定时任务的需求。





