redis 之延时队列
常用的消息队列
我们常见的 MQ 有 RabbmitMq、Kafka、RecoketMQ 等等,他们都是专业级的消息队列,各有各的特性。但他们共同的特性就是比较复杂。有没有一种非常简单,同样也非常高效的消息队列呢? 使用 redis 我们可以非常简单的实现一个高效的消息队列。
redis 的消息队列使用场景
使用 redis 实现的消息队列虽然简单,但是与专业级的相比,肯定是有其不足之处。比如消息的重发、ack 机制、消息持久化等等。当遇到以下场景时,可以考虑使用 redis。
如果你的需求是快产快消的即时消费场景,并且生产的消息立即被消费者消费掉 如果速度是你十分看重的,比如慢了一秒好几千万这种 如果允许出现消息丢失的场景 如果你不需要系统保存你发送过的消息,做到来无影去无踪 需要处理的数据量并不是那么巨大
redis 消息队列的简单实现
首先一个简单的消息队列我们需要两个角色,生产者和消费者。生产者负责生产消息,往队列里面存储,消费者负责监听这个队列,一发现有消息,就立即读出来。

这里我们使用 reids 的 List 来实现消息队列,生产者使用 rpush 往队列里写数据,消费者使用 brpop 阻塞读去读取队列中的数据,这样消费者就可以做到类似于监听的效果,只有队列中有数据就能立即读出来。为避免一些重复的代码,我们先写一个 RedisConnection 类,用来获取 redis 连接
public class RedisConnection {
private static String host = "localhost";
private static int port = 6379;
public static Jedis getConnection(){
JedisPool jedisPool = new JedisPool(host,port);
return jedisPool.getResource();
}
}
先写一个生产者 Producer
class Producer implements Runnable{
@Override
public void run() {
Jedis jedis = RedisConnection.getConnection();
int i =0;
while (true){
String s = String.valueOf(i++);
//生产者往队列里写入数据
jedis.rpush("MSG_PIPELINE", s);
System.out.println("Producer write in redis = "+ s);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
System.out.println("生产者线程被打断");
}
}
}
}
再写一个消费者 Consumer
class Consumer implements Runnable{
@Override
public void run() {
//消费者使用阻塞读来实现未读取到消息时线程等待
Jedis jedis = RedisConnection.getConnection();
while (true){
/**
* int timeout 单次等待超时时间。如果超过这个时间没有获取到队列中的消息,则会返回null,
* 并且再次重试获取队列中的消息
* String key redis List 的key
*/
List<String> rpop = jedis.brpop(10,"MSG_PIPELINE");
System.out.println("Consumer read redis = "+rpop);
}
}
}
最后用一个 main 方法启动这两个线程,这样一个简单的基于 redis 的消息队列就实现了。
public static void main(String[] args) {
//启动生产者
new Thread(new Producer()).start();
//启动消费者
new Thread(new Consumer()).start();
}
日志打印
Producer write in redis = 0
Consumer read redis = [MSG_PIPELINE, 0]
Producer write in redis = 1
Consumer read redis = [MSG_PIPELINE, 1]
Producer write in redis = 2
Consumer read redis = [MSG_PIPELINE, 2]
Producer write in redis = 3
Consumer read redis = [MSG_PIPELINE, 3]
redis 实现延时队列
延时队列的使用场景还是很多的,比如说 12306 购买火车票的时候超过 30 分钟未付款就自动取消,某宝秒杀抢购活动,超过一定时间未付款就取消订单,商品锁定的库存就会回到库存池里,购买电影票等等。使用 redis 的 ZSet,也能简单实现一个延时队列。
我们创建一个 RedisDelayQueue 用来操作读写队列。
public class RedisDelayQueue<T> {
/**
* 消息体
*/
static class TaskItem<T> {
private T msg;
private int delayScore;
}
/**
* 队列名称
*/
private String queueKey;
public RedisDelayQueue(String queueKey) {
this.queueKey = queueKey;
}
/**
* 往队列写入消息
* @param msg 消息
* @param delayScore 延迟时间
*/
public void delay(T msg, int delayScore) {
}
/**
* 轮询获取消息队列中的消息
*/
public void loop(){
}
}
接下来我们编写一下关键的两个函数,delay 函数是往队列中写入数据。 消息被读取的时间 = **delayScore **+ 当前时间,在入参的时候,只需要写入需要等待的时间即可。
/**
* 往队列写入消息
* @param msg 消息
* @param delayScore 延迟时间
*/
public void delay(T msg, int delayScore) {
Jedis jedis = RedisConnection.getConnection();
TaskItem < T > task = new TaskItem < T > ();
task.msg = msg;
task.delayScore = delayScore;
String s = JSON.toJSONString(task);
System.out.println("producer线程池id=" + Thread.currentThread().getId() + ",写入延迟队列,val=" + JSON.toJSONString(task));
//写入延迟队列
jedis.zadd(queueKey, System.currentTimeMillis() + delayScore, s);
}
loop 函数是一个阻塞读函数,当队列没有数据时,线程会休眠 500 毫秒。
/**
* 轮询获取消息队列中的消息
*/
public void loop() {
System.out.println("consumer线程id=" + Thread.currentThread().getId() + "启动");
while (!Thread.interrupted()) {
Jedis jedis = RedisConnection.getConnection();
Set < String > values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
if (values.isEmpty()) {
try {
System.out.println("consumer没有读取到数据,线程休眠");
Thread.sleep(500);
} catch (InterruptedException e) {
System.out.println("线程被打断");
//如果线程被打断,则退出当前线程
break;
}
continue;
}
String msg = values.iterator().next();
//根据当前获取的值,尝试删除。如果删除成功则为只有当前线程获取到了本条消息
if (jedis.zrem(queueKey, msg) > 0) {
TaskItem < T > taskItem = JSON.parseObject(msg, TaskItem.class);
System.out.println("线程池id=" + Thread.currentThread().getId() + ",获取到了消息:" + taskItem.msg);
}
}
}
到此我们就用 redis 实现了延迟队列。当然这是一个非常简单的模型,只是为这种实现方式提供了一种思路。具体在生产环境的话,还需要考虑多种因素。
-END-
如果看到这里,喜欢这篇文章的话,请帮点个好看。微信搜索「一个优秀的废人」,关注后回复「 1024」送你一套完整的 java 教程(包括视频)。回复「 电子书」送你全编程领域电子书 (不只Java)。







