暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

如何通过 Redis 来实现消息延迟队列

rookiedev 2023-02-13
62

延迟队列或许大家都听过,可能也用过,一种比较常见的应用场景就是用户下单后,超过 10 分钟未支付将用户订单取消,这种其实就可以通过延迟队列来实现,这里先不考虑订单数量特别大的情况。大概流程就是用户下单后,订单服务创建订单保存后同时将这笔订单信息放入延迟队列中,设置延迟时间为 10 分钟,10 分钟后从队列中取出来判断支付状态,未支付则将这笔订单取消。

实现延迟队列的方式或许有很多种,如果用 redis 来实现一个延迟队列功能应该怎么做呢,主要是通过 redis 里面的 Zset 集合来实现,Zset 自带的 score 值排序特性就非常适合用来做时间戳排序,当消息的时间戳值小于等于当前时间戳,将该消息取出来进行消费,执行相应的业务逻辑。
实现原理大致如下:
主要分为三大块,生产者,调度器,消费者,生产者负责往 redis 里面生产消息,调度器定时从 Zset 中取出已经到了执行时间的消息并移到 List 集合中,同时消费者负责不断将 List 集合中的消息取出来进行消费。
  • Producer

  1. 首先将消息原始内容保存到 redis 中,key,val 结构,key 是消息体的唯一标识,val 是消息体内容

  2. 将消息 key 根据延迟时间设置对应 score 值添加到 Zset 有序集合中,key 为消息体 topic,score 为当前时间戳加上延迟时间,member 为消息 key

  • Broker

  1. 定时遍历所有 topic,从 Zset 有序集合中通过 zrangebyscore 命令取出每个 topic 下 score 值小于等于当前时间戳的消息 key

  2. 将取出来的消息 key push 放入已经就绪可消费的消息 List 集合中

  • Consumer

  1. 定时遍历所有消息就绪的 topic,取出每个 topic 下可消费的消息 key

  2. 根据消息 key 从 redis 中取出消息体内容,执行相应业务逻辑

存储结构设计:
  • 消息内容存储: (key,val) 结构,例:(msgId1, msgBody)

  • zset 消息 key 存储:(key, score, member),例:(topic1,time,msgId1)

  • List 集合消息 key 存储:(key,[value…]),例:(topic1,[msgId1,msgId2])

上面 Zset 和 List 里面的消息根据不同 topic 作为 key 进行存储,这样可以避免将所有消息都放到一个 topic 下,当消息量比较多的时候出现 redis 里面的大 key 问题,导致存储和查询性能降低。
核心流程图如下:

redis 实现延迟队列的优缺点:
  • 优点

  1. 消息存储在内存中,存储查询速度都比较快

  2. 对于一些简单的应用场景,redis 实现的延迟队列足够轻量,也基本能够符合业务需求

  • 缺点

  1. 由于消息是存储在内存中,因此一旦服务器宕机,存在丢失数据的可能性,尽管 redis 具备持久化的功能,但要做到消息完全不丢失,那么 redis 的持久化策略就需要配置的更严格,但随之带来的就是性能的降低

  2. 消息的 Ack 机制缺失,如果要做到严格的 Ack 功能,则需要通过业务代码来实现,但随之带来的也就是复杂性的提高

总之没有绝对最好的技术实现,只有相对更适合业务需求的技术实现,任何特性在特定场景下可能是优点,而在另外一种场景可能就是缺点了。
核心代码实现:
Broker 调度实现:
1public class DelayQueueBroker {
2
3    private List<String> topicList;
4
5    private RedisDataStore redisDataStore;
6
7    protected ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(1, Executors.defaultThreadFactory());
8
9    public DelayQueueBroker(List<String> topicList, RedisDataStore redisDataStore){
10        this.topicList = topicList;
11        this.redisDataStore = redisDataStore;
12    }
13
14    public void start(){
15        scheduledThreadPool.scheduleAtFixedRate(new RouteThread(), 11, TimeUnit.SECONDS);
16    }
17
18    private class RouteThread implements Runnable {
19
20        @Override
21        public void run() {
22            try {
23                topicList.forEach(topic -> {
24                    List<String> topicKeyList = redisDataStore.zRangeByScore(topic, System.currentTimeMillis());
25                    if(!CollectionUtils.isEmpty(topicKeyList)){
26                        System.out.println("执行时间到了的topicKeyList=" + JSON.toJSONString(topicKeyList));
27                        redisDataStore.lPushAll(topic, topicKeyList);
28                        redisDataStore.zRem(topic, topicKeyList);
29                    }
30                });
31            } catch (Exception e){
32                e.printStackTrace();
33            }
34        }
35    }
36}
Consumer 消费实现:
1public class DelayQueueConsumer {
2
3    private List<String> topicList;
4
5    private RedisDataStore redisDataStore;
6
7    private Map<String, DelayQueueSubscriber> subscriberMap;
8
9    protected ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1, Executors.defaultThreadFactory());
10
11    public DelayQueueConsumer(List<String> topicList, Map<String, DelayQueueSubscriber> subscriberMap, RedisDataStore redisDataStore){
12        this.topicList = topicList;
13        this.subscriberMap = subscriberMap;
14        this.redisDataStore = redisDataStore;
15    }
16
17    public void start(){
18        scheduledThreadPool.scheduleAtFixedRate(new DelayQueueConsumer.ConsumeThread(), 11, TimeUnit.SECONDS);
19    }
20
21    private class ConsumeThread implements Runnable {
22
23        @Override
24        public void run() {
25            topicList.forEach(topic -> {
26                try {
27                    List<String> topicKeyList = redisDataStore.lRange(topic, 0, -1);
28                    topicKeyList.forEach(msgKey -> {
29                        System.out.println("执行时间到了, msgKey=" + msgKey);
30                        DelayQueueSubscriber subscriber = subscriberMap.get(topic);
31                        DelayMessage delayMessage = redisDataStore.get(msgKey);
32                        subscriber.consume(delayMessage);
33                        redisDataStore.lRem(topic, msgKey);
34                    });
35                }catch (Exception e){
36                    e.printStackTrace();
37                }
38            });
39        }
40    }
41}
业务消费接口:
1    // 延迟消息消费业务逻辑实现接口
2public interface DelayQueueSubscriber {
3
4    void consume(DelayMessage delayMessage);
5}
Producer 生产实现:
1public class DelayQueueSender {
2
3    private RedisDataStore redisDataStore;
4
5    public DelayQueueSender(RedisDataStore redisDataStore){
6        this.redisDataStore = redisDataStore;
7    }
8
9    public void add(DelayMessage delayMessage){
10        redisDataStore.set(delayMessage);
11        redisDataStore.zAdd(delayMessage);
12    }
13}
数据存储层实现:
1public class RedisDataStore extends AbstractDataStore {
2
3    private static final String MSG_KEY_PREFIX = "MSG_";
4
5    private static final String ZSET_KEY_PREFIX = "ZSET_";
6
7    private static final String LIST_KEY_PREFIX = "LIST_";
8
9    private final RedisTemplate<String, String> redisTemplate;
10
11    /**
12     * 延迟过期时间.
13     */

14    private final long DELAY_EXPIRED_MILL_SECONDS = 5 * 60 * 1000;
15
16    public RedisDataStore(RedisTemplate<String, String> redisTemplate) {
17        this.redisTemplate = redisTemplate;
18    }
19
20    public boolean set(DelayMessage message) {
21        redisTemplate.opsForValue().set(MSG_KEY_PREFIX + message.getKey(), JSON.toJSONString(message),
22                message.getTimestamp() - System.currentTimeMillis() + DELAY_EXPIRED_MILL_SECONDS, TimeUnit.MILLISECONDS);
23        return true;
24    }
25
26    public DelayMessage get(String key) {
27        String value = redisTemplate.opsForValue().get(MSG_KEY_PREFIX + key);
28        if (StringUtils.isNotEmpty(value)) {
29            return JSON.parseObject(value, DelayMessage.class);
30        }
31        return null;
32    }
33
34    public Boolean zAdd(DelayMessage message) {
35        return redisTemplate.opsForZSet().add(ZSET_KEY_PREFIX + message.getTopic(), message.getKey(), message.getTimestamp());
36    }
37
38    public List<String> zRangeByScore(String key, long timestamp) {
39        Set<ZSetOperations.TypedTuple<String>> tupleSet = redisTemplate.opsForZSet()
40                .rangeByScoreWithScores(ZSET_KEY_PREFIX + key, 0, timestamp);
41        List<String> msgKeyList = new ArrayList<>();
42        if (tupleSet != null) {
43            tupleSet.forEach(item -> {
44                String val = item.getValue();
45                Double score = item.getScore();
46                if (score != null && score.longValue() < timestamp) {
47                    msgKeyList.add(val);
48                }
49            });
50        }
51        return msgKeyList;
52    }
53
54    public boolean zRem(String key, List<String> values) {
55        Long result = redisTemplate.opsForZSet().remove(ZSET_KEY_PREFIX + key, values.toArray(new Object[0]));
56        return result != null && result.intValue() == values.size();
57    }
58
59    public boolean lPushAll(String key, List<String> values) {
60        Long result = redisTemplate.opsForList().leftPushAll(LIST_KEY_PREFIX + key, values);
61        return true;
62    }
63
64    public boolean lRem(String key, String value){
65        Long result = redisTemplate.opsForList().remove(LIST_KEY_PREFIX + key, 0, value);
66        return result != null && result.intValue() == 1;
67    }
68
69    public List<String> lRange(String key, int start, int end) {
70        return redisTemplate.opsForList().range(LIST_KEY_PREFIX + key, start, end);
71
72    }
73}
分别启动运行 Broker,Consumer,最后就可以往延迟队列里面添加消息了:
1public static void main(String[] args) {
2
3     RedisDataStore redisDataStore = new RedisDataStore(stringRedisTemplate);
4
5        List<String> topicList = Arrays.asList("DELAY_QUEUE_TOPIC1""DELAY_QUEUE_TOPIC2");
6        DelayQueueBroker broker = new DelayQueueBroker(topicList, redisDataStore);
7        broker.start();
8
9        Map<String, DelayQueueSubscriber> subscriberMap = new HashMap<>();
10        subscriberMap.put("DELAY_QUEUE_TOPIC1"new Topic1MessageSubscriber());
11        subscriberMap.put("DELAY_QUEUE_TOPIC2"new Topic2MessageSubscriber());
12
13        DelayQueueConsumer consumer = new DelayQueueConsumer(topicList, subscriberMap, redisDataStore);
14    consumer.start();
15
16    DelayQueueSender sender = new DelayQueueSender(redisDataStore);
17    DelayMessage delayMessage = new DelayMessage();
18    delayMessage.setKey("TOPIC1_KEY_" + i);
19    delayMessage.setTopic("DELAY_QUEUE_TOPIC1");
20    delayMessage.setTimestamp(System.currentTimeMillis() + i * 1000);
21    delayMessage.setContent("TOPIC2_CONTENT_" + i);
22    sender.add(delayMessage);
23
24    }
通过 redis 来实现延迟队列的大致原理就如上面所描述,但这样肯定是还没办法用到实际生产业务中去,首先 redis 执行命令的原子性就没有做保证,比如说将消息 lpush 放到 List 中成功了,而 Zset 中 zrem 失败了就会出现消息多次消费的情况。
上面整个的描述和一些简单实现示例就是为了把实现原理介绍清楚,距离真正应用到业务生产中还有很多需要优化的地方,比如说引入 Lua 脚本来做到批量命令执行的原子性,加入消息消费的重试策略,Broker 和 Consumer 定时任务优化,避免在没有消息要处理的时候存在很多无效的 redis 请求。
觉得有点用,给个赞吧

文章转载自rookiedev,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论