
延迟队列或许大家都听过,可能也用过,一种比较常见的应用场景就是用户下单后,超过 10 分钟未支付将用户订单取消,这种其实就可以通过延迟队列来实现,这里先不考虑订单数量特别大的情况。大概流程就是用户下单后,订单服务创建订单保存后同时将这笔订单信息放入延迟队列中,设置延迟时间为 10 分钟,10 分钟后从队列中取出来判断支付状态,未支付则将这笔订单取消。
Producer
首先将消息原始内容保存到 redis 中,key,val 结构,key 是消息体的唯一标识,val 是消息体内容
将消息 key 根据延迟时间设置对应 score 值添加到 Zset 有序集合中,key 为消息体 topic,score 为当前时间戳加上延迟时间,member 为消息 key
Broker
定时遍历所有 topic,从 Zset 有序集合中通过 zrangebyscore 命令取出每个 topic 下 score 值小于等于当前时间戳的消息 key
将取出来的消息 key push 放入已经就绪可消费的消息 List 集合中
Consumer
定时遍历所有消息就绪的 topic,取出每个 topic 下可消费的消息 key
根据消息 key 从 redis 中取出消息体内容,执行相应业务逻辑
消息内容存储: (key,val) 结构,例:(msgId1, msgBody)
zset 消息 key 存储:(key, score, member),例:(topic1,time,msgId1)
List 集合消息 key 存储:(key,[value…]),例:(topic1,[msgId1,msgId2])

优点
消息存储在内存中,存储查询速度都比较快
对于一些简单的应用场景,redis 实现的延迟队列足够轻量,也基本能够符合业务需求
缺点
由于消息是存储在内存中,因此一旦服务器宕机,存在丢失数据的可能性,尽管 redis 具备持久化的功能,但要做到消息完全不丢失,那么 redis 的持久化策略就需要配置的更严格,但随之带来的就是性能的降低
消息的 Ack 机制缺失,如果要做到严格的 Ack 功能,则需要通过业务代码来实现,但随之带来的也就是复杂性的提高
1public class DelayQueueBroker {
2
3 private List<String> topicList;
4
5 private RedisDataStore redisDataStore;
6
7 protected ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(1, Executors.defaultThreadFactory());
8
9 public DelayQueueBroker(List<String> topicList, RedisDataStore redisDataStore){
10 this.topicList = topicList;
11 this.redisDataStore = redisDataStore;
12 }
13
14 public void start(){
15 scheduledThreadPool.scheduleAtFixedRate(new RouteThread(), 1, 1, TimeUnit.SECONDS);
16 }
17
18 private class RouteThread implements Runnable {
19
20 @Override
21 public void run() {
22 try {
23 topicList.forEach(topic -> {
24 List<String> topicKeyList = redisDataStore.zRangeByScore(topic, System.currentTimeMillis());
25 if(!CollectionUtils.isEmpty(topicKeyList)){
26 System.out.println("执行时间到了的topicKeyList=" + JSON.toJSONString(topicKeyList));
27 redisDataStore.lPushAll(topic, topicKeyList);
28 redisDataStore.zRem(topic, topicKeyList);
29 }
30 });
31 } catch (Exception e){
32 e.printStackTrace();
33 }
34 }
35 }
36}
1public class DelayQueueConsumer {
2
3 private List<String> topicList;
4
5 private RedisDataStore redisDataStore;
6
7 private Map<String, DelayQueueSubscriber> subscriberMap;
8
9 protected ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1, Executors.defaultThreadFactory());
10
11 public DelayQueueConsumer(List<String> topicList, Map<String, DelayQueueSubscriber> subscriberMap, RedisDataStore redisDataStore){
12 this.topicList = topicList;
13 this.subscriberMap = subscriberMap;
14 this.redisDataStore = redisDataStore;
15 }
16
17 public void start(){
18 scheduledThreadPool.scheduleAtFixedRate(new DelayQueueConsumer.ConsumeThread(), 1, 1, TimeUnit.SECONDS);
19 }
20
21 private class ConsumeThread implements Runnable {
22
23 @Override
24 public void run() {
25 topicList.forEach(topic -> {
26 try {
27 List<String> topicKeyList = redisDataStore.lRange(topic, 0, -1);
28 topicKeyList.forEach(msgKey -> {
29 System.out.println("执行时间到了, msgKey=" + msgKey);
30 DelayQueueSubscriber subscriber = subscriberMap.get(topic);
31 DelayMessage delayMessage = redisDataStore.get(msgKey);
32 subscriber.consume(delayMessage);
33 redisDataStore.lRem(topic, msgKey);
34 });
35 }catch (Exception e){
36 e.printStackTrace();
37 }
38 });
39 }
40 }
41}
1 // 延迟消息消费业务逻辑实现接口
2public interface DelayQueueSubscriber {
3
4 void consume(DelayMessage delayMessage);
5}
1public class DelayQueueSender {
2
3 private RedisDataStore redisDataStore;
4
5 public DelayQueueSender(RedisDataStore redisDataStore){
6 this.redisDataStore = redisDataStore;
7 }
8
9 public void add(DelayMessage delayMessage){
10 redisDataStore.set(delayMessage);
11 redisDataStore.zAdd(delayMessage);
12 }
13}
1public class RedisDataStore extends AbstractDataStore {
2
3 private static final String MSG_KEY_PREFIX = "MSG_";
4
5 private static final String ZSET_KEY_PREFIX = "ZSET_";
6
7 private static final String LIST_KEY_PREFIX = "LIST_";
8
9 private final RedisTemplate<String, String> redisTemplate;
10
11 /**
12 * 延迟过期时间.
13 */
14 private final long DELAY_EXPIRED_MILL_SECONDS = 5 * 60 * 1000;
15
16 public RedisDataStore(RedisTemplate<String, String> redisTemplate) {
17 this.redisTemplate = redisTemplate;
18 }
19
20 public boolean set(DelayMessage message) {
21 redisTemplate.opsForValue().set(MSG_KEY_PREFIX + message.getKey(), JSON.toJSONString(message),
22 message.getTimestamp() - System.currentTimeMillis() + DELAY_EXPIRED_MILL_SECONDS, TimeUnit.MILLISECONDS);
23 return true;
24 }
25
26 public DelayMessage get(String key) {
27 String value = redisTemplate.opsForValue().get(MSG_KEY_PREFIX + key);
28 if (StringUtils.isNotEmpty(value)) {
29 return JSON.parseObject(value, DelayMessage.class);
30 }
31 return null;
32 }
33
34 public Boolean zAdd(DelayMessage message) {
35 return redisTemplate.opsForZSet().add(ZSET_KEY_PREFIX + message.getTopic(), message.getKey(), message.getTimestamp());
36 }
37
38 public List<String> zRangeByScore(String key, long timestamp) {
39 Set<ZSetOperations.TypedTuple<String>> tupleSet = redisTemplate.opsForZSet()
40 .rangeByScoreWithScores(ZSET_KEY_PREFIX + key, 0, timestamp);
41 List<String> msgKeyList = new ArrayList<>();
42 if (tupleSet != null) {
43 tupleSet.forEach(item -> {
44 String val = item.getValue();
45 Double score = item.getScore();
46 if (score != null && score.longValue() < timestamp) {
47 msgKeyList.add(val);
48 }
49 });
50 }
51 return msgKeyList;
52 }
53
54 public boolean zRem(String key, List<String> values) {
55 Long result = redisTemplate.opsForZSet().remove(ZSET_KEY_PREFIX + key, values.toArray(new Object[0]));
56 return result != null && result.intValue() == values.size();
57 }
58
59 public boolean lPushAll(String key, List<String> values) {
60 Long result = redisTemplate.opsForList().leftPushAll(LIST_KEY_PREFIX + key, values);
61 return true;
62 }
63
64 public boolean lRem(String key, String value){
65 Long result = redisTemplate.opsForList().remove(LIST_KEY_PREFIX + key, 0, value);
66 return result != null && result.intValue() == 1;
67 }
68
69 public List<String> lRange(String key, int start, int end) {
70 return redisTemplate.opsForList().range(LIST_KEY_PREFIX + key, start, end);
71
72 }
73}
1public static void main(String[] args) {
2
3 RedisDataStore redisDataStore = new RedisDataStore(stringRedisTemplate);
4
5 List<String> topicList = Arrays.asList("DELAY_QUEUE_TOPIC1", "DELAY_QUEUE_TOPIC2");
6 DelayQueueBroker broker = new DelayQueueBroker(topicList, redisDataStore);
7 broker.start();
8
9 Map<String, DelayQueueSubscriber> subscriberMap = new HashMap<>();
10 subscriberMap.put("DELAY_QUEUE_TOPIC1", new Topic1MessageSubscriber());
11 subscriberMap.put("DELAY_QUEUE_TOPIC2", new Topic2MessageSubscriber());
12
13 DelayQueueConsumer consumer = new DelayQueueConsumer(topicList, subscriberMap, redisDataStore);
14 consumer.start();
15
16 DelayQueueSender sender = new DelayQueueSender(redisDataStore);
17 DelayMessage delayMessage = new DelayMessage();
18 delayMessage.setKey("TOPIC1_KEY_" + i);
19 delayMessage.setTopic("DELAY_QUEUE_TOPIC1");
20 delayMessage.setTimestamp(System.currentTimeMillis() + i * 1000);
21 delayMessage.setContent("TOPIC2_CONTENT_" + i);
22 sender.add(delayMessage);
23
24 }




