
概述
持久化
消息的可靠性是RabbitMQ的特色,RabbitMQ通过持久化机制保证消息可靠性。为了保证RabbitMQ服务器在重启、崩溃、断电等异常情况下消息没有丢失,除了对消息本身持久化以外、还需要对消息队列、交换器信息进行持久化。
队列持久化
当对队列设置持久化后,称之为持久化队列,存储在磁盘上,RabbitMQ服务器异常重启后持久化队列依然存在。未设置持久化队列称之为暂存队列。
消息持久化
当对消息设置持久化后,由于异常重启后持久化消息依然存在。在RabbitMQ中,消息是存放在队列中的。如果我们仅仅对队列持久化而消息不持久化,持久化队列在异常重启后依然存在,持久化队列中的消息不会在异常重启后存在;对消息设置了持久化而对队列未设置持久化,异常重启后队列不存在;因此我们对队列和消息都需要执行持久化设置。
交换器持久化
为了实现一对多(一方发送多方接收)的消息分发,通常需要使用发布订阅模式。交换器是实现发布订阅模式的基础(DirectExchange、TopicExchange、FanoutExchange、HeaderExchange).未设置交换器持久化发布订阅模式会出现如下问题:
消费者由于异常或断电断开了与RabbitMQ服务器的连接,消息无法成功从队列发送到接收方,未设置交换器(Exchange)持久化,RabbitMQ会自动将与其对应的队列删除。异常重启后消费方无法重新获取交换器中队列及未及时消费的消息。
生产者由于异常或断电的情况下,未设置交换器(Exchange)持久化,重启后交换器不存在,消息队列无法与交换器成功绑定,无法实现发布订阅模式。
消息确认
使用场景
在没有引入消息确认机制时,消费者成功从认为消息成消息队列接收到消息便被功消费,并且从消息队列中移除接收到的消息。在RabbitMQ消息投递过程中,会产生如下问题:
消息是否成功发送到交换器(Exchange)
消息是否成功与交换器的路由策略成功绑定并添加到消息队列
消费者是否成功消费了队列中的消息
这时我们需要引入消息确认机制来保证消息的准确发布和消费。
消息确认方式
| 确认方式 | 作用 |
| auto | 自动确认。消息被消费者成功获取后便意味着消费成功,从队列中删除。 |
| manual | 手动确认。消息被消费者成功获取后调用API,实现手动确认消费成功失败。防止消息未成功消费而队列消息丢失。 |
消息确认类型
| 确认类型 | 作用 |
| 消息发送确认 | 确认是否成功到达交换器(Exchange);确认是否将交换器类型与消息队列成功绑定并添加至消息队列 |
| 消息接收确认 | 确认消费者是否成功消费了消息队列中的消息 |
消息持久化
队列持久化
new Queue("fanoutqueue-one",true,false);
队列持久化,第一个参数为消息队列标识,消费者@RabbitListener中queues字段须与该标识一致,表示监听该消息队列消息并消费;第二个参数(durable:Boolean)设置为true表示队列持久化;第三个参数(autoDelete:Boolean)设置为false表示消费者客户端断开连接后是否自动删除消息队列(此处为不删除)。
交换器持久化
new FanoutExchange(fanoutExchangeName,true,false);
交换器持久化,第一个参数为交换器名称(唯一标识),第二个参数(durable:Boolean)设置为true标识交换器持久化;第三个参数(autoDelete:Boolean)设置为false表示所有绑定的消息队列不在使用时是否自动删除交换器(此处为不删除)。
消息持久化
this.rabbitTemplate.convertAndSend(fanoutExchangeName,"FanoutSend");
当我们使用rabbitTemplate.convertAndSend方法时默认实现了消息持久化;在使用FanoutExchange实现发布订阅模式时无需向使用TopicExchange时指定路由键(RoutingKey),只需要FanoutExchange唯一标识一致即可。
消息确认
#开启消息发送回调spring.rabbitmq.publisher-confirms=true#开启消息接收回调spring.rabbitmq.publisher-returns=true
发送确认
package com.springboot.demo.mq.rabbitmq.mqcallback;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.lang.Nullable;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;/*** 消息是否成功根据ExchangeType添加到消息队列(是否成功发送)*/@Componentpublic class MsgSendConfirmCallback implements RabbitTemplate.ConfirmCallback {@AutowiredRabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this);}/**** @param correlationData:消息唯一标识* @param b:发送确认成功或失败* @param s:失败原因*/@Overridepublic void confirm(@Nullable CorrelationData correlationData, boolean b, @Nullable String s) {System.out.println("消息唯一标识----------"+correlationData);System.out.println("消息发送成功失败--------"+b);System.out.print("失败原因-----------"+s);}}
消息接收确认
package com.springboot.demo.mq.rabbitmq.mqcallback;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Componentpublic class MsgReturnConfirmCallback implements RabbitTemplate.ReturnCallback {@AutowiredRabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnCallback(this);}/**** @param message:消息主体* @param replyCode 响应码* @param replyText 响应内容* @param exchange 交换器* @param routingKey 路由键*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("消息主体----------"+message);System.out.println("消息响应码--------"+replyCode);System.out.print("响应内容-----------"+replyText);System.out.println("使用交换器--------"+exchange);System.out.println("使用路由键--------"+routingKey);}}
消息确认与实体关联
在我们实现消息发送确认的过程中,通过实现RabbitTemplate.ConfirmCallback,我们可以直到消息是否成功发送,但是无法获知具体哪条消息发送成功或失败,需要将Message和消息唯一标识(CorrelationData)进行绑定,就可以获知具体哪条消息发送成功或失败。
UserInfo userInfo=new UserInfo();userInfo.setUserId(10);userInfo.setUserName("RabbitMQTest");userInfo.setUserAge(28);userInfo.setUserSex("1");userInfo.setUserState("1");userInfo.setSendDate(new Date());//将实体转换为JSON字符串String jsonInfo= JSONObject.toJSONString(userInfo);//生成消息唯一标识String messageId= UUID.randomUUID().toString();/*** 将消息唯一标识(messageId)与消息主体绑定* 通过MessageBuilder生成消息主体Message* withBody(entity):设置消息内容* setContentType():设置消息内容类型* setCorrelationId():设置消息唯一标识* build():根据设置内容创建消息*/Message message=MessageBuilder.withBody(jsonInfo.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setCorrelationId(messageId).build();/*** 将消息唯一标识(messageId)与CorrelationData绑定*/CorrelationData correlationData=new CorrelationData(messageId);this.rabbitTemplate.convertAndSend(fanoutExchangeName,"FanoutSend",message,correlationData);




