暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

消息中间件RabbitMQ初识

原创 chirpyli 2025-03-26
268

关于消息中间件

消息传递是分布式系统必然要面对的一个问题,消息中间件解决的就是分布式系统之间消息传递的问题。消息中间件的应用场景大致如下:

  • 业务解耦:交易系统不需要知道短信通知服务的存在,只需要发布消息
  • 削峰填谷:比如上游系统的吞吐能力高于下游系统,在流量洪峰时可能会冲垮下游系统,消息中间件可以在峰值时堆积消息,而在峰值过去后下游系统慢慢消费消息解决流量洪峰的问题
  • 事件驱动:系统与系统之间可以通过消息传递的形式驱动业务,以流式的模型处理

当前主流的消息中间件:RabbitMQ、ActiveMQ和ZeroMQ。

消息中间件可以类比为现实世界中的物流系统,商家将商品发布到物流系统,物流系统将商品送到客户手中。商家和客户可以不用关心东西是怎么运输的,你只需将地址告诉它,剩下的就不用你担心了。

关于AMQP

AMQP(Advanced Message Queuing Protocol) 是一个提供统一消息服务的应用层标准协议。它的主要特征是面向消息、队列、路由(包括点对点和发布/订阅),且可靠性强、安全。AMQP协议是一种二进制协议,为客户端应用与消息中间件之间提供异步、安全、高效的交互。 基于此协议的客户端与消息中间件可传递消息,并不受客户端和中间件不同产品、不同开发语言等条件的限制。下面是从网上截取的图片,能更直观的去理解AMQP协议:
这里写图片描述
可以看到,基本流程就是发布者发布消息—>经过Exchange判断消息路由到那个队列—>具体队列—>订阅者从队列中取走消息。后面会详解。

rabbitmq基础概念

RabbitMQ 是一个由erlang开发的基于AMQP(Advanced Message Queue)协议的开源实现。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面都非常的优秀。是当前最主流的消息中间件之一。

Brocker: 消息队列服务器实体。主要由两个组件Exchange和Queue构成。而Producer和Consumer是客户端,由用户可根据不同的需要可以使用不同的语言实现。
这里写图片描述

Producer: 消息生产者,就是发布消息的客户端程序。

Consumer: 消息消费者,就是订阅消息的客户端程序。

Vhost: 虚拟主机,一个broker里可以开设多个vhost,用作不用用户的权限分离。每个virtual host本质上都是一个RabbitMQ Server,拥有它自己的queue,exchagne,和bindings rule等等(这样方便多个不同应用程序或进程使用同一消息队列服务器)。
这里写图片描述
Connection: 连接,就是一个TCP的连接。Producer和Consumer客户端通过TCP连接到rabbitmq服务器,一般客户端程序的起始处就是建立这个TCP连接。

Channels: 消息通道,在客户端的每个连接里,可建立多个channel(虚拟连接),每个channel代表一个会话任务。它建立在上述的TCP连接中。数据流动都是在Channel中进行的。客户端程序一般情况是起始建立TCP连接,之后建立Channel。

那么,为什么使用Channel,而不是直接使用TCP连接?
对于OS来说,建立和关闭TCP连接是有代价的,频繁的建立关闭TCP连接对于系统的性能有很大的影响,而且TCP的连接数也有限制,这也限制了系统处理高并发的能力。但是,在TCP连接中建立Channel是没有上述代价的。对于Producer或者Consumer来说,可以并发的使用多个Channel进行Publish或者Receive。

Exchange: 消息交换机,指定消息按什么规则,路由到哪个队列。通常分为四种:

  • fanout:该类型路由规则非常简单,会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中,相当于广播功能
  • direct:该类型路由规则会将消息路由到binding key与routing key完全匹配的Queue中
  • topic:与direct类型相似,只是规则没有那么严格,可以模糊匹配和多条件匹配
  • headers:该类型不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配
    这里写图片描述

Queue: 消息队列,每个消息都会被投入到一个或者多个队列里。

Binding: 绑定,它的作用是把exchange和queue按照路由规则binding起来。

Routing Key: 路由关键字,exchange根据这个关键字进行消息投递。

rabbitmq客户端基本流程

这里写图片描述

消息接收:

  • 客户端连接到消息队列服务器,打开一个channel。
  • 客户端声明一个exchange,并设置相关属性。
  • 客户端声明一个queue,并设置相关属性。
  • 客户端使用routing key,在exchange和queue之间建立好绑定关系。

消息发布:

  • 客户端投递消息到exchange。
  • exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。(这一步其实是rabbitmq服务端实现的过程,不在客户端中)

rabbitmq若干细节

谁创建队列queue

Consumer和Procuder都可以通过 queue.declare 创建queue。对于某个Channel来说,Consumer不能declare一个queue,却订阅其他的queue。当然也可以创建私有的queue。这样只有app本身才可以使用这个queue。queue也可以自动删除,被标为auto-delete的queue在最后一个Consumer unsubscribe后就会被自动删除。那么如果是创建一个已经存在的queue呢?那么不会有任何的影响。需要注意的是没有任何的影响,也就是说第二次创建如果参数和第一次不一样,那么该操作虽然成功,但是queue的属性并不会被修改。

那么谁该负责创建这个queue呢?是Consumer,还是Producer?如果queue不存在,当然Consumer不会得到任何的Message。但是如果queue不存在,那么Producer Publish的Message会被丢弃。所以,为了数据不丢失,一般的做法是Consumer和Producer都try to create the queue!这样能够确保这个接口两者都不会出问题。

使用ack确认Message的正确传递

默认情况下,如果Message 已经被某个Consumer正确的接收到了,那么该Message就会被从queue中移除。当然也可以让同一个Message发送到很多的Consumer。如果一个queue没被任何的Consumer Subscribe(订阅),那么,如果这个queue有数据到达,那么这个数据会被cache,不会被丢弃。当有Consumer时,这个数据会被立即发送到这个Consumer,这个数据被Consumer正确收到时,这个数据就被从queue中删除。

那么什么是正确收到呢?通过ack。每个Message都要被acknowledged(确认,ack)。我们可以显示的在程序中去ack,也可以自动的ack。如果有数据没有被ack,那么RabbitMQ Server会把这个信息发送到下一个Consumer。如果这个app有bug,忘记了ack,那么RabbitMQ Server不会再发送数据给它,因为Server认为这个Consumer处理能力有限。

ack的机制还可以起到限流的作用:在Consumer处理完成数据后发送ack,甚至在额外的延时后发送ack,将有效的balance Consumer的load。当然对于实际的例子,比如我们可能会对某些数据进行merge,比如merge 4s内的数据,然后sleep 4s后再获取数据。特别是在监听系统的state,我们不希望所有的state实时的传递上去,而是希望有一定的延时。这样可以减少某些IO,而且终端用户也不会感觉到。

RabbitMQ的安装与使用

安装部署Rabbitmq的时候要注意erlang与rabbitmq的版本问题,可参考
RabbitMQ Erlang Version Requirements选择对应的版本进行安装部署。

安装可参考官网文档Installing on Debian and Ubuntu

使用示例可以参考官网给的python代码示例。

安装Erlang

  1. Adding repository entry
wget https://packages.erlang-solutions.com/erlang-solutions_1.0_all.deb sudo dpkg -i erlang-solutions_1.0_all.deb
  1. Installing Erlang
sudo apt-get update sudo apt-cache madison esl-erlang #列出版本后,选择一个版本安装 sudo apt install esl-erlang=1:20.3

安装rabbitmq

  1. Add apt repositories
echo "deb https://dl.bintray.com/rabbitmq/debian xenial main" | sudo tee /etc/apt/sources.list.d/bintray.rabbitmq.list
  1. Next add our public key to your trusted key list using apt-key(8):
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
  1. Run the following command to update the package list:
sudo apt-get update
  1. Install rabbitmq-server package:
sudo apt install rabbitmq-server=3.7.6-1

启动rabbitmq

启动rabbitmq:

sudo service rabbitmq-server start

Rabbitmq常用命令

查看rabbimtq服务状态:service rabbitmq-server status:

ubuntu@localhost:~$ sudo service rabbitmq-server status ● rabbitmq-server.service - RabbitMQ broker Loaded: loaded (/lib/systemd/system/rabbitmq-server.service; disabled; vendor preset: enabled) Active: active (running) since Fri 2019-08-23 11:14:07 CST; 8min ago Main PID: 57802 (beam.smp) Status: "Initialized" Tasks: 90 Memory: 79.0M CPU: 5.419s CGroup: /system.slice/rabbitmq-server.service ├─57802 /usr/local/lib/erlang/erts-9.3/bin/beam.smp -W w -A 64 -MBas ageffcbf -MHas ageffcb ├─57888 /usr/local/lib/erlang/erts-9.3/bin/epmd -daemon ├─58074 erl_child_setup 1024 ├─58096 inet_gethost 4 └─58097 inet_gethost 4 Aug 23 11:14:05 localhost rabbitmq-server[57802]: ## ## Aug 23 11:14:05 localhost rabbitmq-server[57802]: ## ## RabbitMQ 3.7.6. Copyright (C) 2007-201 Aug 23 11:14:05 localhost rabbitmq-server[57802]: ########## Licensed under the MPL. See http://ww Aug 23 11:14:05 localhost rabbitmq-server[57802]: ###### ## Aug 23 11:14:05 localhost rabbitmq-server[57802]: ########## Logs: /var/log/rabbitmq/rabbit@localho Aug 23 11:14:05 localhost rabbitmq-server[57802]: /var/log/rabbitmq/rabbit@localho Aug 23 11:14:05 localhost rabbitmq-server[57802]: Starting broker... Aug 23 11:14:07 localhost rabbitmq-server[57802]: systemd unit for activation check: "rabbitmq-server. Aug 23 11:14:07 localhost systemd[1]: Started RabbitMQ broker. Aug 23 11:14:08 localhost rabbitmq-server[57802]: completed with 0 plugins.

下面列出一些常用的命令,更多可参考rabbitmqctl官网

sudo chkconfig rabbitmq-server on #添加开机启动(chkconfig一般只有redhat系统有)RabbitMQ服务 sudo service rabbitmq-server start # 启动服务 sudo service rabbitmq-server status # 查看服务状态 sudo service rabbitmq-server stop # 停止服务 sudo rabbitmqctl stop # 停止服务 sudo rabbitmqctl status # 查看服务状态 sudo rabbitmqctl list_users # 查看当前所有用户 sudo rabbitmqctl list_user_permissions guest # 查看默认guest用户的权限 sudo rabbitmqctl delete_user guest# 删掉默认用户(由于RabbitMQ默认的账号用户名和密码都是guest。为了安全起见, 可以删掉默认用户) sudo rabbitmqctl add_user username password # 添加新用户 sudo rabbitmqctl set_user_tags username administrator# 设置用户tag sudo rabbitmqctl set_permissions -p / username ".*" ".*" ".*" # 赋予用户默认vhost的全部操作权限 sudo rabbitmqctl list_user_permissions username # 查看用户的权限

开启web管理接口

rabbitmq既可以命令行操作,也可以用rabbitmq自带的web管理界面,只需要启动插件便可以使用。

sudo rabbitmq-plugins enable rabbitmq_management

然后通过浏览器访问,如果是本机则可以输入http://127.0.0.1:15672打开登录界面,输入用户名和密码访问web管理界面了。默认用户名guest密码guest。

更多可参考Management Plugin

Rabbitmq配置

一般使用默认配置就好了,具体配置可参考RabbitMQ Configuration

rabbitmq端口

以下为rabbitmq的默认端口,如果这些端口被其他程序占用,rabbitmq就会出错。

  • 4369: epmd, a peer discovery service used by RabbitMQ nodes and CLI tools
  • 5672, 5671: used by AMQP 0-9-1 and 1.0 clients without and with TLS
  • 25672: used for inter-node and CLI tools communication (Erlang distribution server port) and is allocated from a dynamic range (limited to a single port by default, computed as AMQP port + 20000). See networking guide for details.
  • 35672-35682: used by CLI tools (Erlang distribution client ports) for communication with nodes and is allocated from a dynamic range (computed as Erlang dist port + 10000 through dist port + 10010). See networking guide for details.
  • 15672: HTTP API clients and rabbitmqadmin (only if the management plugin is enabled)
  • 61613, 61614: STOMP clients without and with TLS (only if the STOMP plugin is enabled)
  • 1883, 8883: (MQTT clients without and with TLS, if the MQTT plugin is enabled
  • 15674: STOMP-over-WebSockets clients (only if the Web STOMP plugin is enabled)
  • 15675: MQTT-over-WebSockets clients (only if the Web MQTT plugin is enabled)

如果有相冲突的端口的话,可以参考Networking and RabbitMQ更改rabbitmq配置或者去down掉占用端口的那个应用程序或进程吧。

参考文档:
一篇文章讲透彻了AMQP协议
AMQP协议

最后修改时间:2025-04-21 13:52:31
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论