MQ应用有很多,比如ActiveMQ,RabbitMQ,Kafka等,但是也可以基于redis来实现,可以降低系统的维护成本和实现复杂度,本篇介绍redis中实现消息队列的几种方案。
基于List的 LPUSH+BRPOP 的实现
PUB/SUB,订阅/发布模式
基于Sorted-Set的实现
基于Stream类型的实现
基于异步消息队列List lpush-brpop(rpush-blpop)
使用rpush和lpush操作入队列,lpop和rpop操作出队列。
List支持多个生产者和消费者并发进出消息,每个消费者拿到都是不同的列表元素。
PUB/SUB,订阅/发布模式
SUBSCRIBE,用于订阅信道
PUBLISH,向信道发送消息
UNSUBSCRIBE,取消订阅
此模式允许生产者只生产一次消息,由中间件负责将消息复制到多个消息队列,每个消息队列由对应的消费组消费。
基于Sorted-Set的实现
Sortes Set(有序列表),类似于java的SortedSet和HashMap的结合体,一方面她是一个set,保证内部value的唯一性,另一方面它可以给每个value赋予一个score,代表这个value的
排序权重。内部实现是“跳跃表”。
有序集合的方案是在自己确定消息顺ID时比较常用,使用集合成员的Score来作为消息ID,保证顺序,还可以保证消息ID的单调递增。通常可以使用时间戳+序号的方案。确保了消息ID的单调递增,利用SortedSet的依据
Score排序的特征,就可以制作一个有序的消息队列了。
Redis从5.0开始引入了一种新的数据类型Stream类型,它是专门为消息队列设计的数据类型。

每个Stream都可以挂多个消费组,每个消费组会有个游标last_delivered_id在Stream数组之上往前移动,表示当前消费组已经消费到哪条消息了
同一份Stream内部的消息会被每个消费组都消费到。
同一个消费组(Consumer Group)可以挂接多个消费者(Consumer),这些消费者之间是竞争关系
消息队列主要是解决生产者和消费者处理能力不一致的问题。Redis Stream提供了消息的持久化和主从复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。
首先,我们看Redis Stream提供了哪些操作命令:
XADD 添加消息到末尾
XTRIM 对流进行修剪,限制长度
XLEN 获取流包含的元素数量,即消息长度
XDEL 删除消息
XRANGE 获取消息列表,会自动过滤已经删除的消息
XREAD 以阻塞或非阻塞方式获取消息列表
XGROUP CREATE 创建消费组
XREADGROUP 按消费者的形式来读取消息
XACK 用于向消息队列确认消息处理已完成
XPENDING 查询每个消费组内所有消费者已读取但尚未确认的消息
一、常用命令
1. XADD
- 1文章转载自ExceptionGirl,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。
评论




