暂无图片
暂无图片
3
暂无图片
暂无图片
暂无图片

RabbitMQ:整合Springboot、集群架构、应用场景及解决方案

一叶扁舟 2020-12-29
2259

image.png

目录

一、概述

1.1、概述

概述

RabbitMQ 相对后来者 Kafka,功能更加单一,是一款典型的“只做好一件事”的软件。

其核心抽象是一个队列 (Queue) ,用户通过 RabbiMQ 定义一组队列用于将内容在不同应用之间传递。

几款MQ对比:ActiveMQ、RocketMQ、Kafka、RabbitMQ

  • 支持语言
    • ActiveMQ、RocketMQ仅支持Java
    • Kafka、RabbitMQ支持多种语言
  • 效率
    • ActiveMQ、RocketMQ、Kafka是毫秒级别
    • RabbitMQ是微秒级别
  • 学习成本
    • RabbitMQ较为简单

1.2、Docker安装RabbitMQ

  • 查看镜像

    docker search rabbitmq:management
  • 拉取镜像(拉取带management的原因是这种带网页端图形化管理界面)

    docker pull rabbitmq:management
  • 启动容器

    • 命令方式启动

      docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management

      注意:这里两个-p,分别代表 -p 服务器端口号:容器端口号 -p 服务器端口号:容器端口号

      • 5672用于提供Rabbitmq服务
      • 15672用于提供网页端图形化管理界面访问
    • docker-compose.yml方式启动

      version: "3.1" services: rabbitmq: image: daocloud.io/library/rabbitmq:management restart: always container_name: rabbitmq ports: - 5672:5672 - 15672:15672 volumes: # 由于rabbitmq可以对消息进行持久化 - ./data:/var/lib/rabbitmq
      # 使用docker-compose启动 docker-compose up -d
  • 查看是否启动

    docker ps

1.3、RabbitMQ图形管理界面使用

Rabbit可视化界面基本使用参看:python操作消息中间件RabbitMQ

二、RabbitMQ构造图

一个RabbitMQ可以有多个Virtual Host

一个Virtual Host可以有多个Exchange

一个Exchange多个Queue

image.png

三、JAVA操作RabbitMQ

3.1、RabbitMQ通信方式

RabbitMQ的7种通信方式

3.2、Java操作RabbitMQ

导入依赖

<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency>

创建连接

public class RabbitMQClient { public static Connection getConection(){ // 创建Connection工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("test"); factory.setPassword("test"); factory.setVirtualHost("/test"); // 创建Connection Connection conn = null; try { conn = factory.newConnection(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } // 返回连接 return conn; } }

测试连接

public void getConnection() throws IOException { Connection connection = RabbitMQClient.getConection(); connection.close(); }

3.3、Java实现RabbitMQ通信方式

3.3.1、方式一:“Hello World!”

image.png

特点

一个生产者、一个默认的交换机、一个队列、一个消费者

一个队列一个消费者

代码实现

  • 生产者

    public class Publisher { @Test public void publish() throws Exception { // 1、获取连接对象 Connection connection = RabbitMQClient.getConection(); // 2、创建Channel Channel channel = connection.createChannel(); // 3、发布消息到exchange,同时指定路由的规则 String msg = "Hello-World-msg!"; // 参数1:指定exchange, 默认的使用"" // 参数2:指定路由规则(routingKey),使用具体的队列名称 // 参数3:指定传递消息所携带的properties(属性) // 参数4:指定发布消息的具体内容,byte[]类型 channel.basicPublish("", "Hello World", null, msg.getBytes()); System.out.println("生产者发布消息成功!"); // 4、释放资源 channel.close(); connection.close(); } }
  • 消费者

    public class Consumer { @Test public void consume() throws Exception { // 1、获取连接对象 Connection connection = RabbitMQClient.getConection(); // 2、创建Channel Channel channel = connection.createChannel(); // 3、声明队列(一定要与生产者指定的队列相同) // 参数1:queue-指定消费队列的名称 // 参数2:durable-当前队列是否需要持久化(true) // 参数3:exclusive-是否排外(效果:1、conn.close-当前队列将会被消除;2、当前队列只能被一个消费者消费) // 参数4:autoDelete-如果这个队列没有消费者消费,则自动删除 // 参数5:arguments-指定当前队列的其他信息(比如最大存多少) channel.queueDeclare("Hello World", true, false, false, null); // 4、开启监听队列并消费 // 创建消费者,并重写handleDelivery方法,对消息进行处理 DefaultConsumer consume = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到消息:" + new String(body, "UTF-8")); } }; // 参数1:queue-指定消费队列的名称 // 参数2:autoAck-指定是否自动ACK(true:接到消息后,会立即告诉RabbitMQ消费完毕,false:需要手动) // 参数3:consumer-指定消费回调 channel.basicConsume("Hello World", true, consume); // 消费掉 System.in.read(); // 【保证不停止运行,一直监听】 // 5、释放资源 channel.close(); connection.close(); } }

3.3.2、方式二:Work queues

image.png

特点

一个发布者、一个默认的交换机、一个队列、两个消费者

一个队列两个消费者

原理

【问题】:RabbitMQ默认是平均分配的,但是由于不同消费者能力不同,如果平均分配会导致有的很闲有的压力很大

【解决】:1、指定消费能力basicQos。2、手动ACK

代码实现

  • 生产者

    channel.basicPublish("", "Work", null, msg.getBytes()); //往Work队列中发送消息
  • 消费者

    • 消费者1

      public class Consumer { @Test public void consume() throws Exception { // 1、获取连接对象 Connection connection = RabbitMQClient.getConection(); // 2、创建Channel Channel channel = connection.createChannel(); // 3、声明队列(一定要与生产者指定的队列相同) channel.queueDeclare("Work", true, false, false, null); // 【改动1】:指定消费能力 channel.basicQos(1); // 4、开启监听队列并消费 DefaultConsumer consume = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1号,接收到消息:" + new String(body, "UTF-8")); // 【改动2】手动ack channel.basicAck(envelope.getDeliveryTag(), false); } }; //【改动2】:false(换为手动ACK) channel.basicConsume("Work", false, consume); // 消费掉 System.in.read(); // 【保证不停止运行,一直监听】 // 5、释放资源 channel.close(); connection.close(); } }
    • 消费者2

      channel.basicQos(2); // 【改动1】:指定消费能力 channel.basicAck(envelope.getDeliveryTag(), false); // 【改动2】手动ack channel.basicConsume("Work", false, consume); //【改动2】:false(换为手动ACK)

3.3.3、方式三:Publish/Subscribe

image.png

特点

一个生产者,一个自己创建的交换机,两个队列,两个消费者

两个队列两个消费者,每个队列中发一份相同的

原理

  • 生产者:

    • 创建一个FANOUT类型的exchange
    • 并把多个队列绑定到交换机上(exchange)
    • 发送消息指定上交换机
  • 消费者:指定要监听的队列

代码实现

  • 生产者

    public class Publisher { @Test public void publish() throws Exception { // 1、获取连接对象 Connection connection = RabbitMQClient.getConection(); // 2、创建Channel Channel channel = connection.createChannel(); // 【改动1】指定交换机「交换机名, 交换机类型, 是否持久化」 channel.exhangeDeclare("PS-exchange", BuiltExchangeType.FANOUT, true) // 【改动2】绑定指定队列「队列,交换机,绑定形式」 channel.queueBind("ps-queue1", "PS-exchange", ""); channel.queueBind("ps-queue2", "PS-exchange", ""); // 3、发布消息到exchange,同时指定路由的规则 String msg = "Publish/Subscribe-msg"; //【改动3】:填写指定上交换机,队列置为空(不指定队列) channel.basicPublish("PS-exchange", "", null, msg.getBytes()); System.out.println("生产者发布消息成功!"); // 4、释放资源 channel.close(); connection.close(); } }
  • 消费者

    • 消费者1

      public class Consumer { @Test public void consume() throws Exception { // 1、获取连接对象 Connection connection = RabbitMQClient.getConection(); // 2、创建Channel Channel channel = connection.createChannel(); // 3、声明队列(一定要与生产者指定的队列相同) // 【改动1】:指定监听的队列ps-queue1 channel.queueDeclare("ps-queue1", true, false, false, null); // 4、开启监听队列并消费 DefaultConsumer consume = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到消息:" + new String(body, "UTF-8")); } }; // 【改动2】:指定消费的队列ps-queue1 channel.basicConsume("ps-queue1", true, consume); // 消费掉 System.in.read(); // 【保证不停止运行,一直监听】 // 5、释放资源 channel.close(); connection.close(); } }
    • 消费者2

      // 【改动1】:指定监听的队列ps-queue2 channel.queueDeclare("ps-queue1", true, false, false, null); // 【改动2】:指定消费的队列ps-queue2 channel.basicConsume("ps-queue1", true, consume); // 消费掉

3.3.4、方式四:Routing

image.png

特点

一个生产者,一个自己创建的交换机,两个队列,两个消费者

两个队列两个消费者,每个队列中可指定(精确指定)发不同的消息

原理

  • 生产者:
    • 创建一个DIRECT类型的exchange
    • 把多个队列绑定到交换机上(exchange)且指定不同的发送规则(精确匹配)
    • 发送消息绑定上交换机,并设定不同的发送规则
  • 消费者:指定要监听的队列

代码实现

  • 生产者

    public class Publisher { @Test public void publish() throws Exception { // 1、获取连接对象 Connection connection = RabbitMQClient.getConection(); // 2、创建Channel Channel channel = connection.createChannel(); //【改动1】:创建exchange channel.exhangeDeclare("routing-exchange", BuiltExchangeType.DIRECT) // 【改动2】绑定指定队列「队列,交换机,发送规则」 channel.queueBind("r-queue_error", "routing-exchange", "ERROR"); channel.queueBind("r-queue_info", "routing-exchange", "INFO"); // 3、发布消息到exchange,同时指定路由的规则 String msg = "Hello-World-msg!"; // 【改动3】设定发送规则 channel.basicPublish("routing-exchange", "ERROR", null, msg.getBytes()); channel.basicPublish("routing-exchange", "INFO", null, msg.getBytes()); System.out.println("生产者发布消息成功!"); // 4、释放资源 channel.close(); connection.close(); } }
  • 消费者

    • 消费者1

      public class Consumer { @Test public void consume() throws Exception { // 1、获取连接对象 Connection connection = RabbitMQClient.getConection(); // 2、创建Channel Channel channel = connection.createChannel(); // 3、声明队列(一定要与生产者指定的队列相同) // 【改动1】:指定监听的队列ps-queue1 channel.queueDeclare("r-queue_error", true, false, false, null); // 4、开启监听队列并消费 DefaultConsumer consume = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到消息:" + new String(body, "UTF-8")); } }; // 【改动2】:指定消费的队列ps-queue1 channel.basicConsume("r-queue_error", true, consume); // 消费掉 System.in.read(); // 【保证不停止运行,一直监听】 // 5、释放资源 channel.close(); connection.close(); } }
    • 消费者2

      // 【改动1】:指定监听的队列r-queue_info channel.queueDeclare("r-queue_info", true, false, false, null); // 【改动2】:指定消费的队列r-queue_info channel.basicConsume("r-queue_info", true, consume); // 消费掉

3.3.5、方式五:Topics

image.png

特点

一个生产者,一个自己创建的交换机,两个队列,两个消费者

两个队列两个消费者,每个队列中可指定(模糊指定)发不同的消息

原理

  • 生产者:
    • 创建一个TOPIC类型的exchange
    • 把多个队列绑定到交换机上(exchange)且指定不同的发送规则(模糊匹配)
    • 发送消息绑定上交换机,并设定不同的发送规则
  • 消费者:指定要监听的队列

代码实现

  • 生产者

    public class Publisher { @Test public void publish() throws Exception { // 1、获取连接对象 Connection connection = RabbitMQClient.getConection(); // 2、创建Channel Channel channel = connection.createChannel(); //【改动1】:创建exchange channel.exhangeDeclare("topic-exchange", BuiltExchangeType.TOPIC) // 【改动2】绑定指定队列「队列,交换机,发送规则」*:1个xxx #:xxx.xxx.xxx.... channel.queueBind("topic-queue_color", "topic-exchange", "color.*"); channel.queueBind("topic-queue_speed", "topic-exchange", "speed.#"); // 3、发布消息到exchange,同时指定路由的规则 String msg = "Hello-World-msg!"; // 【改动3】设定发送规则 channel.basicPublish("routing-exchange", "color.red", null, msg.getBytes()); channel.basicPublish("routing-exchange", "speed.fast.red", null, msg.getBytes()); System.out.println("生产者发布消息成功!"); // 4、释放资源 channel.close(); connection.close(); } }
  • 消费者

    • 消费者1

      public class Consumer { @Test public void consume() throws Exception { // 1、获取连接对象 Connection connection = RabbitMQClient.getConection(); // 2、创建Channel Channel channel = connection.createChannel(); // 3、声明队列(一定要与生产者指定的队列相同) // 【改动1】:指定监听的队列ps-queue1 channel.queueDeclare("r-queue_color", true, false, false, null); // 4、开启监听队列并消费 DefaultConsumer consume = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到消息:" + new String(body, "UTF-8")); } }; // 【改动2】:指定消费的队列ps-queue1 channel.basicConsume("r-queue_color", true, consume); // 消费掉 System.in.read(); // 【保证不停止运行,一直监听】 // 5、释放资源 channel.close(); connection.close(); } }
    • 消费者2

      // 【改动1】:指定监听的队列r-queue_info channel.queueDeclare("r-queue_speed", true, false, false, null); // 【改动2】:指定消费的队列r-queue_info channel.basicConsume("r-queue_speed", true, consume); // 消费掉

3.4、延时队列

3.4.1、TTL

概述

【全称】:Time To Live(存活时间/过期时间)

【特点】1、当消息到达存活时间后,还没有被消费,会被自动清除

​ 2、RabbitMQ可以对消息设置过期时间,也可以为整个队列设置过期时间

​ 3、消息过期后,只有在队列顶端,才会判断其是否过期(避免轮询判断消耗性能)

代码实现

  • 队列的统一过期

    // 2、创建Channel Channel channel = connection.createChannel(); // 设置队列过期时间 Map<String, Object> args = new HashMap<String, Object>(); args.put("x-message-ttl", 60000); // 创建队列:queueName, durable, exclusive, autoDelete, args channel.queueDeclare("publish-queue1", true, false, false, args);
  • 消息的过期

    // 3、发布消息到exchange,同时指定路由的规则 String msg = "Hello-World!"; // 设置过期时间 AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.expiration("5"); AMQP.BasicProperties properties = builder.build(); channel.basicPublish("", "Hello World", properties, msg.getBytes());

3.4.2、死信队列

概述

【全称】:DLX , Dead Letter Exchange(私信交换机)

【概述】:当消息成为Dead Message 后,可以被重新发送到另一个交换机上,这个交换机就是DLX

image.png

【消息成为内死信的三种情况】

​ 1、队列消息长度达限制

​ 2、消费者拒接消息(basicNack/basicReject),并不把消息重新放回原队列(requeue=false)

​ 3、消息超时未被消费

代码实现

// 1、创建普通Exchange channel.exchangeDeclare(exchangeName, "topic", true, false, null); // 2、创建队列,绑定到1上和3上 Map<String, Object> agruments = new HashMap<String, Object>(); agruments.put("x-dead-letter-exchange", routingKey1);// routingKey1:指定2-->3的规则 channel.queueDeclare(queueName, true, false, false, agruments); channel.queueBind(queueName, exchangeName, routingKey2); // routingKey2:指定1-->2的规则 // 3、创建死信Exchange,绑定到2上 channel.exchangeDeclare(exchangeName2, "topic", true, false, null); // 4、创建普通队列,绑定到3上 channel.queueDeclare(queueName2, true, false, false, null); channel.queueBind(queueName2, exchangeName2, routingKe3);// routingKey3:指定3-->4的规则

3.4.3、延时队列

概述

【概述】:消息进入队列中并不会被消费,之后到达指定时间后,才会被消费。

【应用场景】:1、下单后,30分钟未支付,取消订单

​ 2、新注册用户成功7天后,发送短信问候

【解决方案对比】:1、定时器:轮询查询数据库检测是否支付,十分消耗性能,不优雅

​ 2、延时队列:只需查询一次数据库,优雅

【延时队列原理】:使用TTL + 死信队列组合实现延时队列

image.png

代码实现

// 【DLX 创建死信队列】 // 1、创建普通Exchange channel.exchangeDeclare(exchangeName, "topic", true, false, null); // 2、创建队列,绑定到1上和3上 Map<String, Object> agruments = new HashMap<String, Object>(); agruments.put("x-dead-letter-exchange", routingKey1);// routingKey1:指定2-->3的规则 channel.queueDeclare(queueName, true, false, false, agruments); channel.queueBind(queueName, exchangeName, routingKey2); // routingKey2:指定1-->2的规则 // 3、创建死信Exchange,绑定到2上 channel.exchangeDeclare(exchangeName2, "topic", true, false, null); // 4、创建普通队列,绑定到3上 channel.queueDeclare(queueName2, true, false, false, null); channel.queueBind(queueName2, exchangeName2, routingKe3);// routingKey3:指定3-->4的规则 // 【TTL 发消息设置过期时间】 String msg = "Hello-World!"; // 设置过期时间 AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.expiration("1800000"); AMQP.BasicProperties properties = builder.build(); channel.basicPublish(exchangeName2, "Hello World", properties, msg.getBytes());

四、RabbitMQ的问题解决

4.1、防止消息丢失(消息可靠性)

4.1.1、丢失场景

可能有如下4种场景会导致消息丢失

  • 【场景1】:生产者发送消息时,网络等问题,导致未发送到MQ的Exchange
  • 【场景2】:Exchange到Queue过程中MQ突然宕机了
  • 【场景3】:消息已经到达MQ中的Queue,但是MQ突然宕机了
  • 【场景4】:消费者消费消息时,取出了消息,还未处理完对应的业务,消费者宕机了

image.png

4.1.2、解决【场景1】

【场景1】:生产者发送消息时,网络等问题,导致未发送到MQ的Exchange

【解决方法】:使用事务/Comfirm

事务操作

【功能】:事务可以保证消息100%传递,可以通过事物的回滚去记录到日志中,后面定时再次发送当前消息。

【弊端】:事务效率太低,比平时操作效率低上100倍以上。

Comfirm方式

【功能】:和RabbitMQ事务功能一样,并且效率高很多

  • 普通Comfirm方式

    // 3、发布消息到exchange,同时指定路由的规则 // 3.1 开启confirm channel.confirmSelect(); // 3.2 发送消息 String msg = "Hello-World!"; channel.basicPublish("", "Hello World", null, msg.getBytes()); // 3.3 判断是否发送成功 if(channel.waitForConfirms()){ System.out.println("发送成功"); }else{ System.out.println("发送失败, 执行写入日志文件/数据库等业务操作"); }
  • 批量Comfirm方式

    // 3、发布消息到exchange,同时指定路由的规则 // 3.1 开启confirm channel.confirmSelect(); // 3.2 发送消息 for (int i=0; i<1000; i++){ String msg = "Hello-World!" + i; channel.basicPublish("", "Hello World", null, msg.getBytes()); } // 3.3 判断是否全部成功:当发送的消息有1个失败,则全部失败,抛出异常IOException channel.waitForConfirmsOrDie();
  • 异步Comfirm方式

    发送消息和发送结果返回是异步操作的,所以,发送结果返回不影响消息的继续发送

    // 3、发布消息到exchange,同时指定路由的规则 // 3.1 开启confirm channel.confirmSelect(); // 3.2 发送消息 for (int i=0; i<1000; i++){ String msg = "Hello-World!" + i; channel.basicPublish("", "Hello World", null, msg.getBytes()); } // 3.3 异步发送 channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long l, boolean b) throws IOException { System.out.println("消息发送成功,标识:" + l + "是否是批量:" + b); } @Override public void handleNack(long l, boolean b) throws IOException { System.out.println("消息发送失败,标识:" + l + "是否是批量:" + b); } }); // 由于是异步,方便看效果 // System.in.read();

4.1.3、解决【场景2】

【场景2】:Exchange到Queue过程中MQ突然宕机了

【解决方法】:使用return机制

return机制

// 2、创建Channel Channel channel = connection.createChannel(); // 开启return机制 channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException { // 当消息没有送达队列中才会执行 System.out.println(new String(bytes, "UTF-8") + "没有送到队列中"); } }); // 3、发布消息到exchange,同时指定路由的规则 // 3.1 开启confirm channel.confirmSelect(); // 3.2 发送消息 for (int i=0; i<1000; i++){ String msg = "Hello-World!" + i; // 【这里要设置为true才行(默认是false)】 channel.basicPublish("", "Hello World", true, null, msg.getBytes()); }

4.1.4、解决【场景3】

【场景3】:消息已经到达MQ中的Queue,但是MQ突然宕机了

【解决方法】:持久化(RabbitMQ提供了持久化的功能)

持久化操作

  • 设置交换器的持久化

    // 三个参数分别为 交换器名、交换器类型、是否持久化 channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
  • 设置队列的持久化

    // 参数1 queue :队列名 // 参数2 durable :是否持久化 // 参数3 exclusive :仅创建者可以使用的私有队列,断开后自动删除 // 参数4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列 // 参数5 arguments channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  • 设置消息的持久化

    // 参数1 exchange :交换器 // 参数2 routingKey : 路由键 // 参数3 props : 消息的其他参数,其中 MessageProperties.PERSISTENT_TEXT_PLAIN 表示持久化 // 参数4 body : 消息体 channel.basicPublish("", queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

4.1.5、解决【场景4】

【场景4】:消费者消费消息时,取出了消息,还未处理完对应的业务,消费者宕机了

【解决方法】:设置手动ACK即可(取出消息后等到业务理完毕,在手动执行ACK)

手动ACK

// 4、开启监听队列并消费 DefaultConsumer consume = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1号,接收到消息:" + new String(body, "UTF-8")); // 【改动2】手动ACK try{ // 手动ACK channel.basicAck(envelope.getDeliveryTag(), false); }catch (Exception e) { //重新放入队列 //channel.basicNack(envelope.getDeliveryTag(), false, true); //抛弃此条消息 //channel.basicNack(envelope.getDeliveryTag(), false, false); e.printStackTrace(); } } }; //【改动2】:false(换为手动ACK) channel.basicConsume("Work", false, consume); // 消费掉

4.2、防止重复消费(保障幂等性)

原因

消费者1在消费此消息,还没给rabbitmq一个ack,该消息就又被消费者2拿到进行消费

解决原理

  • 1、生产者发送消息时,生成惟一的msgId,与消息一起发送
  • 2、消费者1拿到消息,把msgId:0(0代表还未消费完),以setnx+setex的形式放入redis
  • 3、消费者1消费完消息,把msgId:1(1代表消费完毕),以setex的形式放入redis,并手动ack
  • 4、消费者2拿到同样的消息,把msgId:0以setnx+setex的形式放入redis,会失败
  • 5、消费者2 get(msgId),进行判断:如果是0,说明1在消费,直接返回;如果是1,说明1已经消费完,做手动ack。

image.png

代码实现

  • 生产者

    发送消息时,指定消息id

    // 3.2 发送消息 // 添加唯一标识 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .deliveryMode(1) // 指定是否持久化到消息队列中(1:是;2:否) .messageId(UUID.randomUUID().toString()).build(); String msg = "Hello-World!"; // 存入messageId channel.basicPublish("", "Hello World", true, properties, msg.getBytes());
  • 消费者

    消费消息时,根据逻辑操作redis,用于ack

    // 4、开启监听队列 // 创建消费者,并重写handleDelivery方法,对消息进行处理 DefaultConsumer consume = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { Jedis jedis = new Jedis("localhost", 6379); String messageId = properties.getMessageId(); // 获得messageId // 1、setnx到redis中,默认指定value为0, (NX:setnx EX:setex)过期时间为10s String result = jedis.set(messageId, "0", "NX", "EX", 10); if(result != null && result.equalsIgnoreCase("OK")){ System.out.println("接收到消息" + new String(body, "UTF-8")); // 2、消费成功,set messageId 1 jedis.set(messageId, "1", "EX", 10); // 存在该key直接覆盖 channel.basicAck(envelope.getDeliveryTag(), false); }else{ // 3、如过1中setnx失败,获取key对应的value,0:啥也不做,如果是1:指定ack回调 String s = jedis.get(messageId); if("1".equalsIgnoreCase(s)){ channel.basicAck(envelope.getDeliveryTag(), false); } } } };

4.3、防止消息乱序

Rabbitmq如何保证消息顺序执行

4.4、解决消息积压

MQ消息积压问题如何解决?

TTL、死信队列、延迟队列、

五、RabbitMQ整合SpringBoot

5.1、快速实现

导入依赖

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

编写配置文件

spring: rabbitmq: host: localhost port: 5672 username: test password: test virtual-host: /test # 下面根据具体需求再配置 listener: simple: acknowledge-mode: manual # 手动ACK concurrency:1 # 最小消费者数量 max-concurrency:10 # 最大消费者数量 prefetch: 1 # unack的最大数量(限流) publisher-confirm-type: simple # confirm(防丢失) publisher-returns: true # return(防丢失)

编写RabbitMQ的配置类:方便统一管理

public class RabbitMQConfig{ // 1、创建exchange-topic @Bean public TopicExchange getTopicExchange(){ // Exchange名、持久化、不自动删除 return new TopicExchange("boot-topic-exchange", true, false); } // 2、创建队列 @Bean public Queue getQueue(){ // 队列名、持久化、排外、不自动删除、其他 return new Queue("boot-queue", true, false, false, null); } // 3、绑定exchange和队列、指定发送规则 @Bean public Binding getBinding(TopicExchange topicExchange, Queue queue){ return BindingBuilder.bind(queue).to(topicExchange).with("*.red.*"); } }

生产者

@Autowired private RabbitTemplate rabbitTemplate; // 发送消息 rabbitTemplate.convertAndSend("boot-topic-exchange", "slow.red.rabbit", "慢红兔");

消费者

@RabbitListener(queues = "boot-queue") public void getMassage(Object massage){ // 可以获得RoutingKey(*.red.*) String receivedRoutingKey = message.getMessageProperties().getReceivedRoutingKey() System.out.println("接收到队列" + massage); }

5.2、消息丢失:Comfirm&Return

编写配置文件

spring: rabbitmq: host: localhost port: 5672 username: test password: test virtual-host: /test listener: simple: acknowledge-mode: manual publisher-confirm-type: simple publisher-returns: true

加个PublisherConfirmAndReturnConfig

Comfirm:生产者发送消息时,网络等问题,导致未发送到MQ的Exchange

Return:Exchange到Queue过程中MQ突然宕机了

@Component public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct // 创建对象时,会执行该方法 public void initMethod(){ rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } @Override public void confirm(CorrelationData correlationData, boolean b, String s) { if (b){ System.out.println("消息已经送达exchange"); }else{ System.out.println("消息未送达到exchange"); } } @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { System.out.println("消息未送达到queue中"); } }

5.3、消息丢失:持久化

只需要定义 exchange 和 queue 的持久化。

持久化操作

  • 直接在配置类中指定即可

    public class RabbitMQConfig{ // 1、创建exchange-topic @Bean public TopicExchange getTopicExchange(){ // Exchange名、【持久化】、不自动删除 return new TopicExchange("boot-topic-exchange", true, false); } // 2、创建队列 @Bean public Queue getQueue(){ // 队列名、【持久化】、排外、不自动删除、其他 return new Queue("boot-queue", true, false, false, null); } // 3、绑定exchange和队列、指定发送规则 @Bean public Binding getBinding(TopicExchange topicExchange, Queue queue){ return BindingBuilder.bind(queue).to(topicExchange).with("*.red.*"); } }

5.4、消息丢失:手动ACK

默认是消费完自动ACK(确认消费),手动ACK是消费完后不确认消费,在合适位置设定确认消费

编写配置文件

spring: rabbitmq: host: localhost port: 5672 username: test password: test virtual-host: /test listener: simple: acknowledge-mode: manual

在消费消息位置,设置手动ack

@RabbitListener(queues = "boot-queue") public void getMassage(String msg, Channel channel, Message message) throws IOException { System.out.println("接收到队列" + msg); // 手动ack try{ channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e){ //重新放入队列 //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); //抛弃此条消息 //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); e.printStackTrace(); } }

5.5、消息重复

  • 生产者

    发送消息时,指定消息id

    // 发送消息 CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("boot-topic-exchange", "slow.red.rabbit", "慢红兔", messageId);
  • 消费者

    消费消息时,根据逻辑操作redis,用于ack

    @Component public class Consumer { @Autowired private StringRedisTemplate redisTemplate; @RabbitListener(queues = "boot-queue") public void getMassage(String msg, Channel channel, Message massage){ // 0、获取messageId String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation"); // 1、设置key到redis(如果不存(未被消费)在则进入2,存在(已被消费)进入5) if (redisTemplate.opsForValue().setIfAbsent(messageId, "0", 10, TimeUnit.SECONDS)){ // 2、消费消息 System.out.println("接收到消息:" + msg); // 3、设置key的值为为1(覆盖设置) redisTemplate.opsForValue().set(messageId, "1", 10, TimeUnit.SECONDS); // 4、手动ACK channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }else { // 5、获取key的值,为1说明已经消费完毕,但是没有ACK,直接手动ACK即可;为0说明在消费,啥也不用做 if (StringUtils.equals("1", redisTemplate.opsForValue().get(messageId))){ // 6、手动ACK channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } } }

5.6、TTL、DLX、延时队列

TTL

  • 队列的统一过期

    // 2、创建队列 @Bean public Queue getQueue(){ // 设置队列过期时间 Map<String, Object> args = new HashMap<String, Object>(); args.put("x-message-ttl", 60000); // 队列名、持久化、排外、不自动删除、其他 return new Queue("boot-queue", true, false, false, args); }
  • 消息过期

    // 3、发布消息到exchange,同时指定路由的规则 String msg = "慢红兔"; // 设置过期时间 MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("5000"); return message; } }; // 发送消息 rabbitTemplate.convertAndSend("boot-topic-exchange", "slow.red.rabbit", msg, messagePostProcessor);

DLX

@Configuration public class RabbitMQConfig { // 1、创建创建普通Exchange @Bean public TopicExchange getTopicExchange(){ // Exchange名、持久化、不自动删除 return new TopicExchange(exchangeName, true, false); } // 2、创建中间队列,并绑定到死信交换机上(2-->3) @Bean public Queue getQueue(){ Map<String, Object> agruments = new HashMap<String, Object>(); agruments.put("x-dead-letter-exchange", routingKey1); // 队列名、持久化、排外、不自动删除、其他 return new Queue(queueName, true, false, false, agruments);// 创建并绑定死信交换机 } // 3、创建普通队列 @Bean public Queue getQueue2(){ // 队列名、持久化、排外、不自动删除、其他 return new Queue(queueName2, true, false, false, null); } // 4、绑定中间队列到普通exchange(1--->2) @Bean public Binding getBinding(TopicExchange topicExchange, Queue queue){ return BindingBuilder.bind(queue).to(topicExchange).with(routingKey2); } // 5、绑定普通队列到死信交换机(3--->4) @Bean public Binding getBinding2(TopicExchange topicExchange, Queue queue){ return BindingBuilder.bind(queue).to(topicExchange).with(routingKey3); } }

延时队列

  • 创建死信队列(DLX)

    // 如DLX的 "RabbitMQConfig"
  • 生产者发送消息指定过期时间(TTL)

    // 如TTL的 "消息过期"

六、RabbitMQ集群

没有集群搭建的实践,故此处仅做架构描述

6.1、集群架构

主备模式:主节点挂了,子节点依旧可以提供服务

  • 主备节点共享数据
  • 主节点挂掉,备份节点变为主节点,挂掉的主节点恢复后将变为备份节点
image.png

远程模式:跨地域的两个mq集群互联,可以实现双活

  • 当上游集群挂掉/压力过大,可以转接到下游集群
  • 配置复杂,用的不多
image.png

镜像模式:数据同步,保证数据100%不丢失

  • 较为常用的一种模式

  • 高可靠(某个节点挂掉,其他节点依旧可以提供服务)

  • 不利于横向扩容(每个节点数据一致,横向扩容没有意义,反而增加数据同步的负担),建议奇数个节点(3个为最小奇数集群节点)

image.png

多活模式:异地多活+数据共享,解决镜像模式无法横向扩容的缺点

  • 依赖RabbitMQ的federation插件,可以实现持续可靠的AMQP数据通信
  • 不需要写复杂配置文件,可不停机配置
  • 在多套数据中心中各部署一套RabbitMQ集群(异地多活),各中心的RabbitMQ除了提供正常业务服务外,还会实现部分队列消息共享(数据共享)。
image.png

七、应用场景及解决方案

6.1、应用解耦&异步提速

【不使用MQ】

所有应用同步操作,一方面比较耗时,另一方面,一旦其中一个应用挂掉,所有应用则无法继续使用

image.png

【使用MQ】

应用解耦,所有应用异步操作,减少时耗、避免一个应用挂掉导致所有应用不能使用

image.png

6.2、通知(消息发布订阅)

在使用websocket实现跨服务器聊天时,可把Redis换为MQ,使用Hello World的通信方式。

websocket.png

image.png

6.3、削峰填谷

上下游系统 处理能力存在差距的时候,利用 MQ 做一个 “漏斗” 模型,进行 流控。把 MQ 当成可靠的 消息暂存地,进行一定程度的 消息堆积;在下游有能力处理的时候,再发送消息。

MQ 的流量削峰常用于高并发场景(例如:秒杀、团抢等业务场景),它是缓解瞬时暴增流量的核心手段之一。

【解决方案】: 消费端限流

1、添加rabbitmq,并设置手动ACK方式

2、适当添加Thread.sleep(100);

3、配置文件设置spring.rabbitmq.listener.simple.prefetch=1 //unack的最大数量

image.png

参考:设计秒杀应用

6.4、数据分发

使用Routing或Topic的通信方式进行数据分发

image.png

image.png

6.5、延时任务

我们经常会遇到如下场景

1、定时器:轮询查询数据库检测是否支付,十分消耗性能,不优雅

2、延时队列:只需查询一次数据库,优雅

如果使用定时器去做,将会不断地轮询查询数据库,性能较差,十分不优雅,而RabbitMQ可以使用TTL+DLX实现延时队列,完成定时任务的需求。

image.png

推荐阅读:Python操作RabbitMQ简单使用

最后修改时间:2023-05-25 10:03:04
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论