使用RabbitMQ 可以是通过
rabbit 提供的amqp-client
spring 提供的 spring-rabbit
springboot 提供的 spring-boot-start-amqp
Rabbit MQ 7中用法+ 3种高级用法
Rabbit MQ 7中用法:
简单模式(Hello world)
工作队列模式(Work Queue)
广播(Publish-Subcribe)
路由模式(Routing)
Topic 模式
RPC模式
publish Confirm模式



简单模式(Hello world):
生产者将消息发送给默认的交换机(exchange = “”),代码中发送消息的时候,无需指定 exchange 只需要指定 queue. 消费者监听指定的消费者。
工作队列模式(Work Queue)
生产者将消息发送给默认的交换机(exchange = “”),代码中发送消息的时候,无需指定 exchange 只需要指定 queue. 消费者监听指定的消费者。
区别于简单模式(hello world)只是消费者的数量的区别。
在消费者指定 prefetch = 1 的条件下,且消费了消息,返回了 ACK 的情况下,消息只会发给一个消费者,如果被消费就不会发给另外的消费者,出发接受消息的消费者拒收消息。
广播(Publish-Subcribe)
生产者将消息发送给指定的交换机(fanout type),不需要指定 queue. 消费者监听指定的消费者。
路由模式(Routing)
生产者将消息发送给指定的交换机(direct type),不需要指定 queue, 消息会根据 Routing Key将消息路由给对应的消息队列(Queue). 消费者监听指定的消费者。
Topic 模式
生产者将消息发送给指定的交换机(topic type),不需要指定 queue, 消息会根据 Routing Key将消息通过 通配符的方式路由给对应的消息队列(Queue). 消费者监听指定的消费者。
注意:* 可以代表一个单词# 可以代表多个单词aaa.* 匹配 aaa.bbbaaa.# 匹配 aaa.bbb 和 aaa.bbb.ccc
RPC模式
RPC 方式不推荐,如下是官方说明
我们的代码仍然非常简单,并没有尝试解决更复杂(但重要)的问题,例如:如果没有服务器在运行,客户端应该如何反应?客户端是否应该为 RPC 设置某种超时?如果服务器发生故障并引发异常,是否应该将其转发给客户端?在处理之前防止无效的传入消息(例如检查边界)
Publish Confirm模式
生产者发送消息,可以监听回调,回调有两种 Confirm 和 Return
消息发送给交换机成功/失败会有 confirm 回调, 如果消息发往队列成功失败会有 return 回调。

3种高级用法:
TTL 队列
延时队列
死信队列
TTL队列消息:
TTL队列消息有时效,超过有效时间,就会销毁。
延时队列消息:
延时队列消息,生产者将消息成功发往Queue后一定时间后,才会被消费者监听到。
死信队列消息:
死信队列其实也是一只队列,只是用处是用来装那些不能消费的消息,或者特殊情况下的消息。
消息会在如下3中情况进入死信队列
消息被拒收(basicNack)/ basicReject 并且不将消息方会原队列(requeue=false).
队列中消息数量超过队列上限。
原消息队列设置了过期时间,消息过期未被消费,进入死信队列(正常情况)。

下面是代码演示
Connection
public class RabbitMqUtils {private static ConnectionFactory connectionFactory = new ConnectionFactory();static {connectionFactory.setHost("124.111.11.11");connectionFactory.setPort(5672);//5672是RabbitMQ的默认端口号connectionFactory.setUsername("conan");connectionFactory.setPassword("conan");connectionFactory.setVirtualHost("/conan");}public static Connection getConnection(){Connection conn = null;try {conn = connectionFactory.newConnection();return conn;} catch (Exception e) {throw new RuntimeException(e);}}}
Hello world 模式 生产者
Connection connection = RabbitMqUtils.getConnection();Channel channel = connection.createChannel();//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列//第一个参数:队列名称ID//第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失//第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问//第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列//其他额外的参数, nullchannel.queueDeclare("hello_world_test", false, false, false, null);String msg = "hello world rabbit mq";//四个参数//exchange 交换机//队列名称 ( hello world 模式下 虽然是显示是路由key 其实是 队列名称)//额外的设置属性//最后一个参数是要传递的消息字节数组channel.basicPublish("", "hello_world_test", null, msg.getBytes());channel.close();connection.close();
Hello world 模式 消费者
{ Connection connection = RabbitMqUtils.getConnection();Channel channel = connection.createChannel();//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列//第一个参数:队列名称ID//第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失//第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问//第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列//其他额外的参数, nullchannel.queueDeclare("hello_world_test", false, false, false, null);channel.basicConsume("hello_world_test",new MyDefaultConsumer(channel)); }class MyDefaultConsumer extends DefaultConsumer{private Channel channel;/*** Constructs a new instance and records its association to the passed-in channel.* @param channel the channel to which this consumer is attached*/public MyDefaultConsumer(Channel channel) {super(channel);this.channel =channel;}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body);System.out.println("消费者接收到的消息:"+message);System.out.println("消息的TagId:"+envelope.getDeliveryTag());//false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息channel.basicAck(envelope.getDeliveryTag(), false);super.handleDelivery(consumerTag, envelope, properties, body);}}
工作队列模式 生产者
public static void main(String[] args) throws IOException, TimeoutException {/*** 工作队列模式 work_queue 其实和 hello world 相似 都没有交换机 exchange(默认有个虚拟的交换机)** 区别:hello world 只有一个消费者(虚拟交换机绑定一个队列), work queue 有多个消费者(虚拟交换机绑定多个队列)*/Connection connection = RabbitMqUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare("work_queue_test", false, false, false, null);for (int i = 1; i <= 100; i++) {String msg = "工作队列模式 消息---》 " + i;channel.basicPublish("", "work_queue_test", null, msg.getBytes());}channel.close();connection.close();}
工作队列模式 消费者
public class ConsumerA {public static void main(String[] args) throws IOException {Connection connection = RabbitMqUtils.getConnection();Channel channel = connection.createChannel();// 每次只去一个处理channel.basicQos(1);channel.queueDeclare("work_queue_test",false,false,false,null);channel.basicConsume("work_queue_test", new PublishDefaultConsumer(channel));}}
Topic 模式生产者
public class Producer {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Connection connection = RabbitMqUtils.getConnection();Channel channel = connection.createChannel();/*** 开启监听模式** confirm 和 return 都是 生产者 和 broker 之间的关系, 和消费者无关.*/channel.confirmSelect();// 监听消息是否到达 exchangechannel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.printf("消息到达了 exchange " + deliveryTag);}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.printf("消息不能到达了 exchange " + deliveryTag);}});// 监听消息是否到达 队列/*** 在 topic 的模式下 如果消息 路由key 找不到指定的 队列, 就会存在被 return的情况** mandatory 必须设置成 true*/channel.addReturnListener(new ReturnCallback() {@Overridepublic void handle(Return returnMessage) {System.out.printf("消息被退回 return ");}});String msg = "确认消息 aaa.aaa";String msg2 = "到不了的消息 aaa.bbb.aaa";/*** mandatory true代表如果消息无法正常投递则return回生产者,* 如果false,则直接将消息放弃。*/channel.basicPublish("exchange_topic", "aaa.aaa", true, null, msg.getBytes());channel.basicPublish("exchange_topic", "aaa.bbb.aaa", true, null, msg2.getBytes());// Thread.sleep(5000);// channel.close();// connection.close();}}
Topic 模式消费者
public class ConsumerA {public static void main(String[] args) throws IOException {Connection connection = RabbitMqUtils.getConnection();Channel channel = connection.createChannel();// 消费者指点自己的 队列channel.queueDeclare("queue_exchange_topic", false, false, false, null);// 队列绑定 交换机 exchangechannel.queueBind("queue_exchange_topic", "exchange_topic", "aaa.*");// 每次取一个处理channel.basicQos(1);channel.basicConsume("queue_exchange_topic", new PublishDefaultConsumer(channel));}}
交换机的 fanout, direct, topic 模式差不多还是自己试试看吧。




