RabbitMQ的持久化分为三部分:
交换器持久化:声明交换器时设置durable参数为true;
队列持久化:声明队列时设置durable参数为true;
消息持久化:生产者发布消息时设置deliveryMode为2;
由于交换器不存储消息,即使交换器不持久化,RabbitMQ重启后并不会丢失消息。
队列持久化只持久队列元数据,不会持久化消息,即使设置了队列持久化,RabbitMQ重启后仍然会丢失消息。
消息持久化可以真正持久化消息,但若不持久化队列,那么RabbitMQ重启后仍然会丢失消息。
如何确保数据不会丢失?
即使交换器、队列和消息都持久化,仍然会丢失消息,主要有以下几点考虑:
消费者autoAck如果设为true,那么当消费者收到消息还没来得及处理就挂掉了。解决办法:autoAck设为false;
由于有操作系统缓存,消息持久化并不会立马同步到磁盘,假如在同步磁盘前RabbitMQ挂掉了,则消息丢失。解决办法:使用RabbitMQ镜像队列机制,即用从节点做数据备份,除非整个集群都挂掉,否则消息是不会丢失的;
另外,发送端引入事务机制或确认机制,来确保消息是成功发送到了RabbitMQ,并且还要确保消息能够正确地理由到相应的队列中。
2、生产者确认机制
RabbitMQ为生产者提供了两种机制来确保消息成功发送到了服务端:
事务机制
确认机制
生产者事务机制和确认机制只要确保消息发送到了交换器即可,不管这个交换器能不能正确地把消息路由到队列中。
事务机制和确认机制只能二选一,不能共存。
2.1、事务机制
相关方法:
channel.txSelect:将当前信道设成事务模式;
channel.txCommit:如果事务提交成功,则消息一定到达了RabbitMQ;
channel.txRollback:用try-catch块包裹住channel.txCommit,如果事务提交不成功,则可以进行回滚;
try {
channel.txSelect();
// 不存在该交换器
channel.basicPublish("test_tx1","key",true,false,null,("hello,world:tx1").getBytes());
// 存在该交换器
channel.basicPublish("test_tx2","key",true,false,null,("hello,world:tx2").getBytes());
// 消息会发送失败,然后回滚,原本能正确发送的消息hello,world:tx2也会回滚
channel.txCommit();
} catch (IOException e) {
e.printStackTrace();
channel.txRollback();
}
2.2、确认机制
由于事务机制十分消耗性能,因此RabbitMQ引入了一种轻量级的替代方案:确认机制。
事务机制是同步的,发送一条消息之后立马阻塞,等RabbitMQ回应之后才会发送下一条消息。
确认机制是异步的,通过回调函数处理。
生产者通过channel.confirmSelect方法将信道设置为确认模式,RabbitMQ会在消息正确发送到交换器后给生产者返回一个确认。如果没有对应的交换器,则直接抛出异常。
try {
channel.confirmSelect();
// 没有该交换器,直接抛出异常
channel.basicPublish("no_this_exchange", "key",null, "message".getBytes());
if (!channel.waitForConfirms()) {
System.out.println("消息发送失败");
}else{
System.out.println("消息发送成功");
}
// 有该交换器,但是没有绑定的队列,消息会发送成功
channel.basicPublish("test_confirm", "key",null, "message".getBytes());
if (!channel.waitForConfirms()) {
System.out.println("消息发送失败");
}else{
System.out.println("消息发送成功");
}
} catch (InterruptedException e){
e.printStackTrace();
}
每发送一条消息,就调用一次waitForConfirms效率有点慢,可以有以下两种方案:
a)批量confirm
每发送一批消息确认一次,如果确认失败,则重新发送这批消息,代码如下所示。
channel.confirmSelect();
int count = 0;
List<String> list = new ArrayList<>();
for (;;) {
channel.basicPublish("test_confirm", "key", null, "message".getBytes());
// 将发送出去的消息存入缓存中
list.add("message");
if (++count >= 10) {
count = 0;
if (channel.waitForConfirms()) {
System.out.println("批量确认成功!");
// 将缓存中的消息清空
list.clear();
}else{
System.out.println("批量确认失败,需要将缓存起来的消息重发!");
}
}
// 每半秒钟发送一次消息
Thread.sleep(500);
}
b)异步confirm
代码如下所示。
// 为每个channel维护一个unconfirm的消息序号集合
TreeSet<Long> unconfirmSet = new TreeSet();
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
// deliveryTag是消息的唯一序列号
@Override
public void handleAck(long deliveryTag, boolean multiple) {
System.out.println("异步确认,消息发生成功");
if (multiple) {
unconfirmSet.headSet(deliveryTag+1).clear();
} else {
unconfirmSet.remove(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) {
System.out.println("异步确认,消息发生失败,消息需要重发!");
// 在这里添加消息重发的逻辑
if (multiple) {
unconfirmSet.headSet(deliveryTag+1).clear();
} else {
unconfirmSet.remove(deliveryTag);
}
}
});
// 每半秒发生一条消息
for (;;) {
// 当处于confirm模式时,返回下一条要发布的消息的序列号
long nextSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish("test_confirm","key",null,"message".getBytes());
// 每发布一条消息,unconfirmSet就添加一个元素
unconfirmSet.add(nextSeqNo);
Thread.sleep(500);
}
总结:
事务机制和普通confirm机制编程简单,但吞吐量低,批量confirm和异步confirm则相反,根据实际场景选择不同的应对方式。
3、消息的顺序性
RabbitMQ很难保证消息的顺序性,具体的原因有:
多个生产者同时发消息,无法保证消息到达Broker的先后顺序;
对于事务机制,假设RabbitMQ异常,那么消息发送失败,需要回滚消息,补偿消息时使用了另一个线程;
对于确认机制,假设RabbitMQ异常,那么消息发送失败,补偿消息时使用了另一个线程;
对消息设置了不同的TTL,某些消息进入了死信队列中;
对消息设置了优先级;
队列中消息是有序的,但有多个消费者消费同一队列,尽管轮询机制这也是有序的,但如果消费者拒绝了消息并requeue,那么消息就会错序了;
如果要保证消息的顺序性,需要业务方使用RabbitMQ之后做进一步的处理,比如在消息体内添加ID来实现。
4、消息的传输保障
消息的传输保证分三个层级:
最多一次:消息可能丢失,但绝不会重复传输;这个比较简单;
最少一次:消息不会丢失,但可能会重复传输;实现方案如下:
生产者开启确认机制,保证消息可靠地传输到RabbitMQ中;
生产者发布消息时将mandatory参数设为true,并且使用备份交换器;
消息和队列都要持久化,防止RabbitMQ重启消息丢失;
消费者autoAck设为false,并手动确认;
恰好一次:消息恰好消费一次;这个是无法保证的,具有情形如下:
消费者在消费完一条消息后向RabbitMQ发送确认命令,此时由于网络断开或其它原因造成RabbitMQ没收到确认命令,那么这条消息也不会被删除,在重新建立连接后,消费者会重复消费这条消息;
生产者在使用确认机制时,发送完一条消息等待RabbitMQ返回确认通知,此时网络断开,生产者捕获到异常情况,为了确保消息可靠性选择重新发送,这样RabbitMQ中就有两条同样的消息,在消费的时候,消费者就会重复消费;
对于以上情形,RabbitMQ也没有去重机制来保证恰好一次,目前大多数的消息中间件也没有去重机制,一般是根据实际场景通过业务进行去重。





