暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

延时消息

新架构思考 2021-11-12
1583

一、延时消息是什么?

延时消息就是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才获取到这个消息进行消费。

二、延时消息如何实现?

一、RabbitMQ实现延迟队列

1. TTLTime-To-Live+DLXDead Letter Exchanges

消息在队列满足达到一定的条件,会被认为是死信消息(dead-lettered),这时候,RabbitMQ会重新把这类消息发到另外一个的exchange,这个exchange称为Dead Letter Exchanges.

以下任一条件满足,即可认为是死信:

  • 消息被拒绝消费(basic.reject or basic.nack)并且设置了requeue=fasle

  • 消息的TTL到了(消息过期)

  • 达到了队列的长度限制

需要注意的是,Dead letter exchanges (DLXs) 其实就是普通的exchange,可以和正常的exchange一样的声明或者使用。

 


设置消息过期的两种方式:

1、params.put("x-message-ttl", 5 *1000);
 
第一种方式是直接设置 Queue 延迟时间 但如果直接给队列设置过期时间,这种做法不是很灵活,(当然二者是兼容的,默认是时间小的优先)

 

2、rabbitTemplate.convertAndSend(book,message -> {
 message.getMessageProperties().setExpiration(2 * 1000 + "");
  return message;
 });

第二种就是每次发送消息动态设置延迟时间,这样我们可以灵活控制

DLX + TTL 方式存在的时序问题


 

 

左侧队列 queue1 分别两条消息msg1、msg2 过期时间都为 1s,输出顺序为 msg1、msg2 是没问题的

右侧队列 queue2 分别两条消息msg1、msg2 注意问题来了,msg2 的消息过期时间为1S 而 msg1 的消息过期为 2S,你可能想谁先过期就谁先消费呗,显然不是这样的,因为这是在同一个队列,必须前一个消费,第二个才能消费,所以就出现了时序问题。

如果你的消息过期时间是有规律的,例如,有的 1S、有的 2S,那么我们可以以时间为维度设计为两个队列,如下所示:

上面我们将 1S 过期的消息拆分为队列queue_1s,2S 过期的消息拆分为队列 queue_2s,事情得到进一步解决。如果此时消息的过期时间不确定或者消息过期时间维度过多,在消费端我们就要去监听多个消息队列且对于消息过期时间不确定的也是很难去设计的。

结论:

如果消息 TTL 是相同的,或者少量的固定的TTL的情况,使用 DLX + TTL 的这种方式是没问题的,对于我来说目前还是优选。

2. 利用安装rabbitmq_delayed_message_exchange插件

插件提供了一个新的 x-delayed-message 类型的 Exchange,消息发送时指定消息头 x-delay [:毫秒值] 就可以将消息进行延迟投递。


结论:

目前该插件的当前设计并不真正适合包含大量延迟消息(例如数十万或数百万)的场景,但基本的场景满足。

二、表存储+定时任务扫描

数据库一般来说是我们很容易想到的一个办法,我们通常可以建立下面这样一个表:

CREATE TABLE `delay_message` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `excute_time` bigint(16) DEFAULT NULL COMMENT '
执行时间,ms级别',
  `body` varchar(4096) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '消息体',
  PRIMARY KEY (`id`),
  KEY `time_index` (`excute_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

这个表中我们使用excute_time代表我们真实的执行时间,并且对其建立索引,然后在我们的消息服务中,启动一个定时任务,定时从数据库中扫描已经可以执行的消息,然后开始执行,具体流程如下面所示


结论:

使用数据库+定时任务的方法是一个比较原始的方法,数据库数据很多性能很差,这个方案通常不会被考虑。

三、Rocketmq延迟队列实现

Rocketmq的定时队列通过一个叫做“SCHEDULE_TOPIC_XXXX”的Topic来实现,这个Topic用来处理需要被延迟发送的消息。在Rocketmq中延迟消息被分为几个延迟级别,每个延迟级别分别对应“SCHEDULE_TOPIC_XXXX”Topic下一个延迟队列默认延迟级别为:”1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”。在Broker启动时,会启动相对应队列的线程来处理各个延迟队列的延迟消息。

结论:

支持特定的 level满足绝大部分场景,性能很好,但是目前我们使用的是RabbitMq消息队列,替换消息中间件成本较大,另外该方案不支持任意时间精度的延迟消息发送。

其他消息中间件实现任意精度的有:QMQ采用双重时间轮实现

四、自研采用时间轮+磁盘存储+缓存来实现


很多公司基于RocketMQ做自己的一套支持任意时间的延时消息,在美团内部封装了RocketMQ使用LevelDB做了对延时消息的封装,在滴滴开源的DDMQ中,使用了RocksDBRocketMQ的延时消息部分进行了封装。去哪儿网的QMQ的延时/定时消息使用的是两层hash wheel来实现的。

结论:

需求场景是否有此需求?研发周期长,需要研究现有开源实现。

 

 

 

 


文章转载自新架构思考,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论