暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

docker部署rabbitmq、使用DelayExchange插件实现延时消息发送

原创 . 2024-02-27
497

认识RabbitMQ及AMQP协议

RabbitMQ

  • RabbitMQ是采用Erlang编写的、实现高级消息队列协议(Advanced Message Queuing Protocol,AMQP)规范的一种**开源**消息代理服务器
  • RabbitMQ客户端官方支持Erlang,Java,Ruby等,社区产出多种API,几乎支持所有语言
  • RabbitMQ非常轻量,运行核心功能以及诸如管理界面的插件只需要不到40MB内存,性能很好,单机吞吐量可达万级
  • 依托于Erlang天生支持并发的特点,RabbitMQ支持集群部署,支持镜像队列,允许水平扩展
  • 支持众多第三方插件,社区活跃,文档较多

AMQP协议

  • AMQP协议是一个具有现代特征的二进制协议。一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制
  • 整个AMQP协议的过程:由生产者发布消息到交换机,交换机再通过路由规则将消息发送到不同的队列中去存储,然后消费者从队列中监听拿走对应的消息来消费

部署Rabbitmq单节点

  • 拉取rabbitmq镜像
docker pull rabbitmq:management
  • 准备配置文件
    # 创建文件夹
    mkdir /app/mq/data
    # 编辑配置
    vim /app/mq/rabbitmq.conf
    # 配置内容
    #开启默认账号
    loopback_users.guest = true
    #监听对外通信端口配置
    listeners.tcp.default = 5672
    #集群节点发现配置
    cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
    #集群节点1(后续基于节点1扩展多节点)
    cluster_formation.classic_config.nodes.1 = rabbit@rabbitmq1
  • 配置 cookie
vim /app/mq/.erlang.cookie

BVWDHYVHEZGVSKPDYUDK
**集群中各节点.erlang.cookie内容应一致**
# 修改cookie文件的权限
chmod 600 /caso/mq/.erlang.cookie
  • 创建docker共享网络
docker network create mq-net
  • 启动容器
    docker run -d --hostname rabbitmq1 --restart unless-stopped --name rabbitmq1 --net mq-net -p 4369:4369 -p 25672:25672 -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123456 -v /app/mq/data:/var/lib/rabbitmq -v /app/mq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf -v /app/mq/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie rabbitmq:management
    
    参数解释:
    -d 容器后台运行
    --hostname rabbitmq1 容器的主机名为 rabbitmq1
    --restart=unless-stopped docker 容器重启后重启MQ
    --name rabbitmq1 容器名为rabbitma1,在宿主机上运行“docker ps”命令时显示的名称
    -p 4369:4369 EPMD(Erlang端口映射守护进程)端口号
    -p 25672:25672 节点间和CLI工具通信(Erlang分发服务器端口)
    -p 5672:5672 消息通讯端口
    -p 15672:15672 后台管理端口
    -e RABBITMQ_DEFAULT_USER=root 设置rabbitmq默认用户为root
    -e RABBITMQ_DEFAULT_PASS=123456 设置rabbitmq默认密码为123456
    -v /app/mq/data:/var/lib/rabbitmq 挂载/app/mq/data目录到容器
    -v /app/mq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf 映射配置文件
    -v /app/mq/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie 映射cookie文件,也可以使用-e RABBITMQ_ERLANG_COOKIE='rabbitmq' 设置 rabbitmq的cookie为“rabbitmq”,可以自定义为其他文本,容器保持一致即可。因为新版本中RABBITMQ_ERLANG_COOKIE环境变量方式已被标记为弃用,在需要加载cookie的命令执行时会有警告,未来版本该方式会被移除,推荐映射文件方式部署
  •  验证是否启动成功

浏览器访问管理控制台 http://ip:15672/,输入账号、密码 登录

  • 安装延时消息插件
# 下载插件包
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
**注意插件版本要与rabbitmq版本对应**
mkdir /app/mq/mq-plugins
将插件上传到对应目录
# 拷贝插件到容器
docker cp /app/mq/mq-plugins/rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq1:/plugins
# 进入容器
docker exec -it rabbitmq1 /bin/bash
# 验证插件是否存在
cd plugins
ls |grep delay
# 启动延时消息插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • rabbitmq安装包中自带消息追踪日志插件,为了方便排查问题可以提前开启插件
# 启动消息追踪日志插件
rabbitmq-plugins enable rabbitmq_tracing

[插件使用方法](https://github.com/MarcialRosales/rabbitmq-tracing-guide)
#
开启一个消息追踪
#开启的消息追踪
#日志内容
  • 退出容器,重启使新安装插件生效
exit
docker restart rabbitmq1

rabbitmq集群单节点部署完成,在spring-boot项目中实现消息发送、延时消息消费及异常消息存储

  • 依赖
<!-- 消息中间件 rabbitmq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>3.0.4</version>
</dependency>
  • rabbitmq客户端配置
spring:
  rabbitmq:
    host: 192.168.0.210
    port: 5672
    username: root
    password: 123456
    # 消费者确认机制相关配置
    # 开启publisher-confirm,
    # 这里支持两种类型:simple:同步等待confirm结果,直到超时;# correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调ConfirmCallback
    publisher-confirm-type: correlated
    # publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
    publisher-returns: true
    # 定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
    template:
    mandatory: true
    listener:
        simple:
            # 一次拉取的数量(默认值:250,强制要求顺序消费配置1)
            prefetch: 250
            # 消费端的监听个数(即@RabbitListener开启几个线程去处理数据。)
            concurrency: 10
            # 消费端的监听最大个数
            max-concurrency: 100
            # 消息手动确认
            acknowledge-mode: manual
            # 消费者retry重试机制
            retry:
                enabled: true
                # 每次失败后等待时长的翻倍数
                multiplier: 1
                # 最大重试次数
                max-attempts: 3
                # 重试状态
                stateless: true
        direct:
            auto-startup: true
            acknowledge-mode: manual
            retry:
                enabled: true
                max-attempts: 3
        type: simple


  • 配置延时消息生产者消费者交换机、队列、路由


@Configuration
public class MqProducerConsumerConfig {
/**
* 延迟交换机
*/
public static final String DELAY_EXCHANGE_NAME = "member.delay.exchange";
/**
* 延迟队列
*/
public static final String DELAY_QUEUE_NAME = "member.delay.queue";
/**
* 延迟队列路由key
*/
public static final String DELAY_QUEUE_ROUTING_KEY = "delay.routingKey";

/**
* 声明延迟交换机
*
* @return CustomExchange 延迟交换机
*/
@Bean
public CustomExchange delayExchange() {
HashMap<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}

/**
* 声明延迟队列
*
* @return Queue 延迟队列
*/
@Bean
public Queue delayQueue() {
return new Queue(DELAY_QUEUE_NAME);
}

/**
* 设置延迟队列的绑定关系
*
* @return Binding 延迟队列的绑定关系
*/
@Bean
public Binding delayBinding() {
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(DELAY_QUEUE_ROUTING_KEY).noargs();
}
}
```
- 配置异常消息生产者交换机、队列、路由

```java
@Configuration
public class MqErrorMsgConfig {
/**
* 交换机
*/
public static final String EXCHANGE_NAME = "member.error.exchange";
/**
* 队列
*/
public static final String QUEUE_NAME = "member.error.queue";
/**
* 队列路由key
*/
public static final String QUEUE_ROUTING_KEY = "error";

/**
* 声明失败消息交换机
*
* @return 交换机
*/
@Bean
public DirectExchange errorExchange() {
return new DirectExchange(EXCHANGE_NAME);
}

/**
* 声明失败消息队列
*
* @return 消息队列
*/
@Bean
public Queue errorQueue() {
// 设置队列持久化
return new Queue(QUEUE_NAME, true);
}

/**
* 绑定交换机、消息队列,指定对应的队列路由key
*
* @return 绑定
*/
@Bean
public Binding errorBind() {
return BindingBuilder.bind(errorQueue()).to(errorExchange()).with(QUEUE_ROUTING_KEY);
}

/**
* 实现MessageRecover接口的子类RepublishMessageBuilder
*
* @param rabbitTemplate rabbitTemplate
* @return 异常消息
*/
@Bean
public MessageRecoverer republishMessageBuilder(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(rabbitTemplate, EXCHANGE_NAME, QUEUE_ROUTING_KEY);
}
}
```
- 封装消息发送组件

```java
@Component
public class MqProducerMsgSendUtil {
private static final Logger log = LoggerFactory.getLogger(MqProducerMsgSendUtil.class);
@Resource
private RabbitTemplate rabbitTemplate;

public MqProducerMsgSendUtil() {
}

public void send(String delayExchangeName, String delayQueueRoutingKey, String msg, Integer delayTime) {
log.info("发送mq消息:{},延时:{} ms接收", msg, delayTime);
this.rabbitTemplate.convertAndSend(delayExchangeName, delayQueueRoutingKey, msg, (message) -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setHeader("x-delay", delayTime);
return message;
});
}
}
  • 延时消息消费业务
@Slf4j
@Component
public class RabbitMqMessageListener {

@Autowired
private MqProducerMsgSendUtil mqProducerMsgSendUtil;

/**
* rabbitmq延迟消息监听
*
* @param message 消息
* @param c 信道
* @param msg 消息内容
*/
@RabbitListener(queues = DELAY_QUEUE_NAME)
public void receiveMqDelayMsg(Message message, Channel c, String msg) throws Exception {
MessageProperties properties = message.getMessageProperties();
String receivedExchange = properties.getReceivedExchange();
String consumerQueue = properties.getConsumerQueue();
String routingKey = properties.getReceivedRoutingKey();
log.info("收到 交换机:{} 队列:{} 路由键:{} 消息:{}", receivedExchange, consumerQueue, routingKey, msg);

Boolean haveConsumer = false;

// 是需要处理的消息
if (msg.contains(RedisKeyConstants.PAY_ORDER_STATE)) {
haveConsumer = true;
log.info("执行xxx业务 >>> 消息:{}", msg);

...
...
...

// 手动回执,签收
long tag = properties.getDeliveryTag();
c.basicAck(tag, false);
}

// 没有合适消费者,抛异常消息重发
if (!haveConsumer) {
long tag = properties.getDeliveryTag();
// 应答,防止Unacked堆积
c.basicAck(tag, false);
log.error("交换机:{} 队列:{} 路由键:{} 消息:{} 没有正常消费,异常重试!", receivedExchange, consumerQueue, routingKey, msg);
// 需要将异常抛出才能触发retry重试机制
throw new Exception(MqExceptionEnum.MESSAGE_CONSUMPTION_FAILED.getMsg());
}
}
}
高可用方面
  • 集群部署

    将rabbitmq1作为节点1,在另外两台服务其中部署节点2、节点3
    配置、cookie与节点1相同

# 节点2
docker network create mq-net
docker run -d --hostname rabbitmq2 --add-host rabbitmq3:x.x.x.x --add-host rabbitmq1:x.x.x.x --restart unless-stopped --name rabbitmq2 --net mq-net -p 4369:4369 -p 25672:25672 -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123456 -v /app/mq/data:/var/lib/rabbitmq -v /app/mq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf -v /app/mq/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie rabbitmq:management

# 节点3
docker network create mq-net
docker run -d --hostname rabbitmq3 --add-host rabbitmq2:x.x.x.x --add-host rabbitmq1:x.x.x.x --restart unless-stopped --name rabbitmq3 --net mq-net -p 4369:4369 -p 25672:25672 -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123456 -v /app/mq/data:/var/lib/rabbitmq -v /app/mq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf -v /app/mq/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie rabbitmq:management

# 节点1 host增加节点2、节点3映射
docker exec -it rabbitmq1 /bin/bash
apt-get update
apt-get install vim
vim /etc/hosts
# 节点2
x.x.x.x rabbitmq2
# 节点3
x.x.x.x rabbitmq3
#重新加载节点;
#停止MQ
rabbitmqctl stop_app;
#resetMQ
rabbitmqctl reset;
#启动MQ
rabbitmqctl start_app;
#退出并重启容器
exit;
docker restart rabbitmq1;

# 节点2加入
#进入容器
docker exec -it rabbitmq2 bash;
#停止MQ
rabbitmqctl stop_app;
#resetMQ
rabbitmqctl reset;
#启动插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange;
#跟机器1的消息队列建立关系
rabbitmqctl join_cluster --ram rabbit@rabbitmq1;
#修改节点类型
rabbitmqctl change_cluster_node_type disc
#重新启动MQ
rabbitmqctl start_app;
#退出并重启容器
exit;
docker restart rabbitmq2;

# 节点3加入
#进入容器
docker exec -it rabbitmq3 bash;
#停止MQ
rabbitmqctl stop_app;
#resetMQ
rabbitmqctl reset;
#启动插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange;
#跟机器1的消息队列建立关系
rabbitmqctl join_cluster --ram rabbit@rabbitmq1;
#修改节点类型
rabbitmqctl change_cluster_node_type disc
#重新启动MQ
rabbitmqctl start_app;
#退出并重启容器
exit;
docker restart rabbitmq3;


  • 使用HAProxy/Nginx进行负载均衡,使用keepalived进行负载均衡中间件保活、路由

        keepalived原理是基于VRRP(虚拟路由选择协议)实现高可用性,即在多个服务器之间动态地分配IP地址和路由功能,可以在自建主机、虚拟机、支持VRRP的云服务器中部署,不支持虚拟IP的环境下部署无效!

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论