死信队列是什么
死信,Dead Letter,一种消息机制,当消费者去消费队列中的消息时,如果队列中的消息出现了以下的情况:
•消费端执行nack或者reject时,设置requeue=false;•消息在队列中的时间超过设置的TTL(Time To Live)时间;•队列中消息的数量超过设置的最大数量;
那么这些消息就可以被称之为死信消息,在配置了死信队列的情况下,死信消息会进入死信队列,如果没有配置死信队列,这些死信消息会被丢弃。
理解死信队列
死信队列并不仅仅只是一个queue,还包含死信交换机(Dead Letter Exchange),关于死信队列和死信交换机要说明几点:
死信交换机可以是fanout、direct、topic等类型,和普通交换机并无不同;
死信交换机要绑定要业务队列上才会生效;
给死信交换机绑定的队列称之为死信队列,其实就是普通的队列,没有任何特殊之处;
并不是整个项目只能设置一个死信交换机和死信队列,可以根据业务需要设置多个或者单个死信交换机使用不同的routing-key;
代码示例
配置文件
spring:rabbitmq:addresses: 127.0.0.1:5672username: lzmpassword: lzmvirtual-host: testlistener:simple:acknowledge-mode: manual # 手动ackdefault-requeue-rejected: false # 设置为false,requeue或reject
创建交换机和队列以及绑定
/*** 死信交换机*/@Beanpublic DirectExchange dlxExchange(){return new DirectExchange(dlxExchangeName);}/*** 死信队列*/@Beanpublic Queue dlxQueue(){return new Queue(dlxQueueName);}/*** 死信队列绑定死信交换机*/@Beanpublic Binding dlcBinding(Queue dlxQueue, DirectExchange dlxExchange){return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(dlxRoutingKey);}/*** 业务队列*/@Beanpublic Queue queue(){Map<String,Object> params = new HashMap<>();params.put("x-dead-letter-exchange",dlxExchangeName);//声明当前队列绑定的死信交换机params.put("x-dead-letter-routing-key",dlxRoutingKey);//声明当前队列的死信路由键params.put("x-message-ttl",10000);//设置队列消息的超时时间,单位毫秒,超过时间进入死信队列params.put("x-max-length", 10);//生命队列的最大长度,超过长度的消息进入死信队列return QueueBuilder.durable(queueName).withArguments(params).build();}/*** 业务交换机*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(exchangeName,true,false);}/*** 业务队列和业务交换机的绑定*/@Beanpublic Binding binding(Queue queue, FanoutExchange fanoutExchange){return BindingBuilder.bind(queue).to(fanoutExchange);}
注意创建业务队列的部分,设置业务队列的超时时间是10s,队列中消息最大数量为10。
上面代码中,业务交换机为fanout类型的交换机,死信交换机为Direct类型的交换机。
生产者
public void send(){for (int i = 0; i < 5; i++) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(exchangeName,"","消息==>"+i,message -> {message.getMessageProperties().setExpiration(3000+"");//发送消息时设置消息的超时时间return message;},correlationData);}}
注意:
队列中消息的超时时间可以是在创建队列时设置,表示对队列中所有的消息生效,也可以在发送消息时设置,两者相比取最小值作为TTL的值。
先不启动消费者,此时启动生产者并向其中发送消息,刚发送完消息时如下所示:

三秒后消息自动进入死信队列中

这也就验证了上述所说的,当消息在队列中的时间超过TTL的时间时,消息会自动进入死信队列。针对这一特性,可以给消息设置过期时间后发送到某个队列,从而来进行延迟消费
注意看上图的红框中的内容:
Lim:表示设置了队列中消息数量x-max-length参数
DLX:表示设置了死信交换机x-dead-letter-exchange参数
DLK:表示设置了死信路由键x-dead-letter-routing-key参数,不设置该值时,消息在进入死信队列后,路由键保持原来的不变,设置了该值,消息的路由键就变为新设置的值。
下面我们启动消费者,并且模拟在某些情况下执行nack操作,先看消费者代码
@RabbitHandler@RabbitListener(queues = {"${platform.queue-name}"},concurrency = "1")public void msgConsumer(String msg, Channel channel, Message message) throws IOException {try {if(msg.indexOf("5")>-1){throw new RuntimeException("抛出异常");}log.info("消息{}消费成功",msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {log.error("接收消息过程中出现异常,执行nack");//第三个参数为true表示异常消息重新返回队列,会导致一直在刷新消息,且返回的消息处于队列头部,影响后续消息的处理channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);log.error("消息{}异常",message.getMessageProperties().getHeaders());}}
当消息中包含5时抛出异常,执行nack,其他消息都执行ack,生产者发送0-9共10条消息,执行结果如下:

同时查看死信队列中的数据,确实只有1条

并且消息的交换机以及路由键都是我们在代码中设置好的值

同时消息的headers中也会将进入死信队列的原因以及次数等进行说明
也就是说在执行nack,同时设置requeue=false时,消息会自动进入死信队列
最后我们再测试一下最大数量的问题,前面我们设置队列中最大数量是10,此时关闭消费者,同时删除队列的TTL,然后发送20条数据到业务队列中

可以看到业务队列和死信队列各有10条数据,也就是说队列中的消息数量超过设置的最大数量时,消息会进入死信队列
总结
死信交换机和死信队列都只是普通的交换机和队列,只不过被用来处理死信消息,而死信消息的产生是由于TTL过期或者队列中的消息数超过最大消息数,再或者消费端reject或者nack消息时设置了requeue=false,消息变为死信后,由死信交换机路由到死信队列,再由专门的消费者消费死信队列中的消息。
死信队列更多的是用来保证消息的可靠性,主要用于比较重要的队列,用以确保未被正确消费的消息不会丢失,其实也可以不用死信队列,在消费端出现异常时,可以将消息从当前队列ack掉,再将其发送到其他队列,然后再单独处理其他队列,这都是可以的。
本节测试代码参考码云[1].
引用链接
[1]
码云: https://gitee.com/liaidai/rabbitmq-practice/tree/master/rabbit-dlx-7802




