暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

SpringBoot2.x学习笔记——消息持久化与消息确认机制(ACK)

CodeWu 2019-05-19
466


概述

   持久化 

    消息的可靠性是RabbitMQ的特色,RabbitMQ通过持久化机制保证消息可靠性。为了保证RabbitMQ服务器在重启、崩溃、断电等异常情况下消息没有丢失,除了对消息本身持久化以外、还需要对消息队列、交换器信息进行持久化。

    队列持久化

    当对队列设置持久化后,称之为持久化队列,存储在磁盘上,RabbitMQ服务器异常重启后持久化队列依然存在。未设置持久化队列称之为暂存队列。

    消息持久化

    当对消息设置持久化后,由于异常重启后持久化消息依然存在。在RabbitMQ中,消息是存放在队列中的。如果我们仅仅对队列持久化而消息不持久化,持久化队列在异常重启后依然存在,持久化队列中的消息不会在异常重启后存在;对消息设置了持久化而对队列未设置持久化,异常重启后队列不存在;因此我们对队列和消息都需要执行持久化设置。

    交换器持久化

    为了实现一对多(一方发送多方接收)的消息分发,通常需要使用发布订阅模式。交换器是实现发布订阅模式的基础(DirectExchange、TopicExchange、FanoutExchange、HeaderExchange).未设置交换器持久化发布订阅模式会出现如下问题:

  •  消费者由于异常或断电断开了与RabbitMQ服务器的连接,消息无法成功从队列发送到接收方,未设置交换器(Exchange)持久化,RabbitMQ会自动将与其对应的队列删除。异常重启后消费方无法重新获取交换器中队列及未及时消费的消息。

  • 生产者由于异常或断电的情况下,未设置交换器(Exchange)持久化,重启后交换器不存在,消息队列无法与交换器成功绑定,无法实现发布订阅模式。

  消息确认

    使用场景  

    在没有引入消息确认机制时,消费者成功从认为消息成消息队列接收到消息便被功消费,并且从消息队列中移除接收到的消息。在RabbitMQ消息投递过程中,会产生如下问题:

  •  消息是否成功发送到交换器(Exchange)

  • 消息是否成功与交换器的路由策略成功绑定并添加到消息队列

  • 消费者是否成功消费了队列中的消息

   这时我们需要引入消息确认机制来保证消息的准确发布和消费。

    消息确认方式

    

确认方式作用
auto自动确认。消息被消费者成功获取后便意味着消费成功,从队列中删除。
manual手动确认。消息被消费者成功获取后调用API,实现手动确认消费成功失败。防止消息未成功消费而队列消息丢失。

    消息确认类型

    

确认类型作用
消息发送确认确认是否成功到达交换器(Exchange);确认是否将交换器类型与消息队列成功绑定并添加至消息队列
消息接收确认确认消费者是否成功消费了消息队列中的消息

消息持久化

    队列持久化

    

new Queue("fanoutqueue-one",true,false);

    队列持久化,第一个参数为消息队列标识,消费者@RabbitListener中queues字段须与该标识一致,表示监听该消息队列消息并消费;第二个参数(durable:Boolean)设置为true表示队列持久化;第三个参数(autoDelete:Boolean)设置为false表示消费者客户端断开连接后是否自动删除消息队列(此处为不删除)。

    交换器持久化

 new FanoutExchange(fanoutExchangeName,true,false);

    交换器持久化,第一个参数为交换器名称(唯一标识),第二个参数(durable:Boolean)设置为true标识交换器持久化;第三个参数(autoDelete:Boolean)设置为false表示所有绑定的消息队列不在使用时是否自动删除交换器(此处为不删除)。

    消息持久化

    

  this.rabbitTemplate.convertAndSend(fanoutExchangeName,"FanoutSend");

    当我们使用rabbitTemplate.convertAndSend方法时默认实现了消息持久化;在使用FanoutExchange实现发布订阅模式时无需向使用TopicExchange时指定路由键(RoutingKey),只需要FanoutExchange唯一标识一致即可。

消息确认

#开启消息发送回调
spring.rabbitmq.publisher-confirms=true
#开启消息接收回调
spring.rabbitmq.publisher-returns=true

     发送确认
    

package com.springboot.demo.mq.rabbitmq.mqcallback;


import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.lang.Nullable;
import org.springframework.stereotype.Component;


import javax.annotation.PostConstruct;


/**
* 消息是否成功根据ExchangeType添加到消息队列(是否成功发送)
*/
@Component
public class MsgSendConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Autowired
RabbitTemplate rabbitTemplate;
@PostConstruct
public void init()
{
rabbitTemplate.setConfirmCallback(this);
}


/**
*
* @param correlationData:消息唯一标识
* @param b:发送确认成功或失败
* @param s:失败原因
*/
@Override
public void confirm(@Nullable CorrelationData correlationData, boolean b, @Nullable String s) {
System.out.println("消息唯一标识----------"+correlationData);
System.out.println("消息发送成功失败--------"+b);
System.out.print("失败原因-----------"+s);
}
}

    消息接收确认

package com.springboot.demo.mq.rabbitmq.mqcallback;


import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


import javax.annotation.PostConstruct;
@Component
public class MsgReturnConfirmCallback implements RabbitTemplate.ReturnCallback {
@Autowired
RabbitTemplate rabbitTemplate;
@PostConstruct
public void init()
{
rabbitTemplate.setReturnCallback(this);
}
/**
*
* @param message:消息主体
* @param replyCode 响应码
* @param replyText 响应内容
* @param exchange 交换器
* @param routingKey 路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("消息主体----------"+message);
System.out.println("消息响应码--------"+replyCode);
System.out.print("响应内容-----------"+replyText);
System.out.println("使用交换器--------"+exchange);
System.out.println("使用路由键--------"+routingKey);
}
}


消息确认与实体关联

    在我们实现消息发送确认的过程中,通过实现RabbitTemplate.ConfirmCallback我们可以直到消息是否成功发送,但是无法获知具体哪条消息发送成功或失败,需要将Message和消息唯一标识(CorrelationData)进行绑定,就可以获知具体哪条消息发送成功或失败。

 UserInfo userInfo=new UserInfo();
userInfo.setUserId(10);
userInfo.setUserName("RabbitMQTest");
userInfo.setUserAge(28);
userInfo.setUserSex("1");
userInfo.setUserState("1");
userInfo.setSendDate(new Date());
//将实体转换为JSON字符串
String jsonInfo= JSONObject.toJSONString(userInfo);
//生成消息唯一标识
String messageId= UUID.randomUUID().toString();
/**
* 将消息唯一标识(messageId)与消息主体绑定
* 通过MessageBuilder生成消息主体Message
* withBody(entity):设置消息内容
* setContentType():设置消息内容类型
* setCorrelationId():设置消息唯一标识
* build():根据设置内容创建消息
*/
Message message=MessageBuilder.withBody(jsonInfo.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setCorrelationId(messageId).build();
/**
* 将消息唯一标识(messageId)与CorrelationData绑定
*/
CorrelationData correlationData=new CorrelationData(messageId);
this.rabbitTemplate.convertAndSend(fanoutExchangeName,"FanoutSend",message,correlationData);


 



文章转载自CodeWu,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论