暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Redis实现消息队列

ExceptionGirl 2021-08-26
904

MQ应用有很多,比如ActiveMQ,RabbitMQ,Kafka等,但是也可以基于redis来实现,可以降低系统的维护成本和实现复杂度,本篇介绍redis中实现消息队列的几种方案。

  • 基于List的 LPUSH+BRPOP 的实现

  • PUB/SUB,订阅/发布模式

  • 基于Sorted-Set的实现

  • 基于Stream类型的实现

基于异步消息队列List lpush-brpop(rpush-blpop)

使用rpushlpush操作入队列,lpoprpop操作出队列。

List支持多个生产者和消费者并发进出消息,每个消费者拿到都是不同的列表元素。

PUB/SUB,订阅/发布模式

SUBSCRIBE,用于订阅信道

PUBLISH,向信道发送消息

UNSUBSCRIBE,取消订阅

此模式允许生产者只生产一次消息,由中间件负责将消息复制到多个消息队列,每个消息队列由对应的消费组消费。

基于Sorted-Set的实现

Sortes Set(有序列表),类似于java的SortedSet和HashMap的结合体,一方面她是一个set,保证内部value的唯一性,另一方面它可以给每个value赋予一个score,代表这个value的

排序权重。内部实现是“跳跃表”。

有序集合的方案是在自己确定消息顺ID时比较常用,使用集合成员的Score来作为消息ID,保证顺序,还可以保证消息ID的单调递增。通常可以使用时间戳+序号的方案。确保了消息ID的单调递增,利用SortedSet的依据

Score排序的特征,就可以制作一个有序的消息队列了。

Redis从5.0开始引入了一种新的数据类型Stream类型,它是专门为消息队列设计的数据类型。


每个Stream都可以挂多个消费组,每个消费组会有个游标last_delivered_id在Stream数组之上往前移动,表示当前消费组已经消费到哪条消息了

同一份Stream内部的消息会被每个消费组都消费到

同一个消费组(Consumer Group)可以挂接多个消费者(Consumer),这些消费者之间是竞争关系

   消息队列主要是解决生产者和消费者处理能力不一致的问题。Redis Stream提供了消息的持久化和主从复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。


  • 首先,我们看Redis Stream提供了哪些操作命令:

  • XADD 添加消息到末尾

  • XTRIM 对流进行修剪,限制长度

  • XLEN 获取流包含的元素数量,即消息长度

  • XDEL 删除消息

  • XRANGE 获取消息列表,会自动过滤已经删除的消息

  • XREAD 以阻塞或非阻塞方式获取消息列表

  • XGROUP CREATE 创建消费组

  • XREADGROUP 按消费者的形式来读取消息

  • XACK 用于向消息队列确认消息处理已完成

  • XPENDING 查询每个消费组内所有消费者已读取但尚未确认的消息

  • 一、常用命令

  • 1. XADD

  • 1
    文章转载自ExceptionGirl,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

    评论