暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

RabbitMQ第五弹 如何保证消息不丢失?

波波的小书房 2021-08-10
483
1、持久化

RabbitMQ的持久化分为三部分:

  • 交换器持久化:声明交换器时设置durable参数为true;

  • 队列持久化:声明队列时设置durable参数为true;

  • 消息持久化:生产者发布消息时设置deliveryMode为2;

由于交换器不存储消息,即使交换器不持久化,RabbitMQ重启后并不会丢失消息。

队列持久化只持久队列元数据,不会持久化消息,即使设置了队列持久化,RabbitMQ重启后仍然会丢失消息。

消息持久化可以真正持久化消息,但若不持久化队列,那么RabbitMQ重启后仍然会丢失消息。


如何确保数据不会丢失?

即使交换器、队列和消息都持久化,仍然会丢失消息,主要有以下几点考虑:

  • 消费者autoAck如果设为true,那么当消费者收到消息还没来得及处理就挂掉了。解决办法:autoAck设为false;

  • 由于有操作系统缓存,消息持久化并不会立马同步到磁盘,假如在同步磁盘前RabbitMQ挂掉了,则消息丢失。解决办法:使用RabbitMQ镜像队列机制,即用从节点做数据备份,除非整个集群都挂掉,否则消息是不会丢失的;

另外,发送端引入事务机制或确认机制,来确保消息是成功发送到了RabbitMQ,并且还要确保消息能够正确地理由到相应的队列中。

2、生产者确认机制

RabbitMQ为生产者提供了两种机制来确保消息成功发送到了服务端:

  • 事务机制

  • 确认机制

生产者事务机制和确认机制只要确保消息发送到了交换器即可,不管这个交换器能不能正确地把消息路由到队列中。

事务机制和确认机制只能二选一,不能共存。

2.1、事务机制

相关方法:

  • channel.txSelect:将当前信道设成事务模式;

  • channel.txCommit:如果事务提交成功,则消息一定到达了RabbitMQ;

  • channel.txRollback:用try-catch块包裹住channel.txCommit,如果事务提交不成功,则可以进行回滚;


try {
  channel.txSelect();
  // 不存在该交换器
  channel.basicPublish("test_tx1","key",true,false,null,("hello,world:tx1").getBytes());
  // 存在该交换器
  channel.basicPublish("test_tx2","key",true,false,null,("hello,world:tx2").getBytes());
  // 消息会发送失败,然后回滚,原本能正确发送的消息hello,world:tx2也会回滚
  channel.txCommit();
} catch (IOException e) {
  e.printStackTrace();
  channel.txRollback();
}


2.2、确认机制

由于事务机制十分消耗性能,因此RabbitMQ引入了一种轻量级的替代方案:确认机制。

事务机制是同步的,发送一条消息之后立马阻塞,等RabbitMQ回应之后才会发送下一条消息。

确认机制是异步的,通过回调函数处理。

生产者通过channel.confirmSelect方法将信道设置为确认模式,RabbitMQ会在消息正确发送到交换器后给生产者返回一个确认。如果没有对应的交换器,则直接抛出异常。


try {
  channel.confirmSelect();
  // 没有该交换器,直接抛出异常
  channel.basicPublish("no_this_exchange", "key",null, "message".getBytes());
  if (!channel.waitForConfirms()) {
    System.out.println("消息发送失败");
  }else{
    System.out.println("消息发送成功");
  }
 
  // 有该交换器,但是没有绑定的队列,消息会发送成功
  channel.basicPublish("test_confirm", "key",null, "message".getBytes());
  if (!channel.waitForConfirms()) {
    System.out.println("消息发送失败");
  }else{
    System.out.println("消息发送成功");
  }
} catch (InterruptedException e){
  e.printStackTrace();
}


每发送一条消息,就调用一次waitForConfirms效率有点慢,可以有以下两种方案:

a)批量confirm

每发送一批消息确认一次,如果确认失败,则重新发送这批消息,代码如下所示。

channel.confirmSelect();
int count = 0;
List<String> list = new ArrayList<>();
for (;;) {
  channel.basicPublish("test_confirm", "key", null, "message".getBytes());
  // 将发送出去的消息存入缓存中
  list.add("message");
  if (++count >= 10) {
    count = 0;
    if (channel.waitForConfirms()) {
      System.out.println("批量确认成功!");
      // 将缓存中的消息清空
      list.clear();
    }else{
      System.out.println("批量确认失败,需要将缓存起来的消息重发!");
    }
  }
  // 每半秒钟发送一次消息
  Thread.sleep(500);
}


b)异步confirm

代码如下所示。

// 为每个channel维护一个unconfirm的消息序号集合
TreeSet<Long> unconfirmSet = new TreeSet();
 
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
  // deliveryTag是消息的唯一序列号
  @Override
  public void handleAck(long deliveryTag, boolean multiple)
{
    System.out.println("异步确认,消息发生成功");
    if (multiple) {
      unconfirmSet.headSet(deliveryTag+1).clear();
    } else {
      unconfirmSet.remove(deliveryTag);
    }
  }
 
  @Override
  public void handleNack(long deliveryTag, boolean multiple)
{
    System.out.println("异步确认,消息发生失败,消息需要重发!");
    // 在这里添加消息重发的逻辑
    if (multiple) {
      unconfirmSet.headSet(deliveryTag+1).clear();
    } else {
      unconfirmSet.remove(deliveryTag);
    }
 
  }
});
 
// 每半秒发生一条消息
for (;;) {
  // 当处于confirm模式时,返回下一条要发布的消息的序列号
  long nextSeqNo = channel.getNextPublishSeqNo();
  channel.basicPublish("test_confirm","key",null,"message".getBytes());
  // 每发布一条消息,unconfirmSet就添加一个元素
  unconfirmSet.add(nextSeqNo);
  Thread.sleep(500);
}


总结:

事务机制和普通confirm机制编程简单,但吞吐量低,批量confirm和异步confirm则相反,根据实际场景选择不同的应对方式。

3、消息的顺序性

RabbitMQ很难保证消息的顺序性,具体的原因有:

  • 多个生产者同时发消息,无法保证消息到达Broker的先后顺序;

  • 对于事务机制,假设RabbitMQ异常,那么消息发送失败,需要回滚消息,补偿消息时使用了另一个线程;

  • 对于确认机制,假设RabbitMQ异常,那么消息发送失败,补偿消息时使用了另一个线程;

  • 对消息设置了不同的TTL,某些消息进入了死信队列中;

  • 对消息设置了优先级;

  • 队列中消息是有序的,但有多个消费者消费同一队列,尽管轮询机制这也是有序的,但如果消费者拒绝了消息并requeue,那么消息就会错序了;

如果要保证消息的顺序性,需要业务方使用RabbitMQ之后做进一步的处理,比如在消息体内添加ID来实现。

4、消息的传输保障

消息的传输保证分三个层级:

最多一次:消息可能丢失,但绝不会重复传输;这个比较简单;

最少一次:消息不会丢失,但可能会重复传输;实现方案如下:

  • 生产者开启确认机制,保证消息可靠地传输到RabbitMQ中;

  • 生产者发布消息时将mandatory参数设为true,并且使用备份交换器;

  • 消息和队列都要持久化,防止RabbitMQ重启消息丢失;

  • 消费者autoAck设为false,并手动确认;

恰好一次:消息恰好消费一次;这个是无法保证的,具有情形如下:

  • 消费者在消费完一条消息后向RabbitMQ发送确认命令,此时由于网络断开或其它原因造成RabbitMQ没收到确认命令,那么这条消息也不会被删除,在重新建立连接后,消费者会重复消费这条消息;

  • 生产者在使用确认机制时,发送完一条消息等待RabbitMQ返回确认通知,此时网络断开,生产者捕获到异常情况,为了确保消息可靠性选择重新发送,这样RabbitMQ中就有两条同样的消息,在消费的时候,消费者就会重复消费;

对于以上情形,RabbitMQ也没有去重机制来保证恰好一次,目前大多数的消息中间件也没有去重机制,一般是根据实际场景通过业务进行去重。


文章转载自波波的小书房,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论