
RabbitMQ 主要解决模块之间耦合度过高问题。
安装 RabbitMQ。
linux 下使用 docker-compose 安装。
# docker-compose.ymlversion: "3.1"services:rabbitmq:image: daocloud.io/library/rabbitmq:managementrestart: alwayscontainer_name: rabbitmqports:- 5672:5672- 15672:15672volumes:- ./data:/var/lib/rabbitmq
打开可视化操作界面。http://10.36.144.157:15672/,ip 换为自己的。登录账户和密码是 guest,然后创建账户和 virtual。

架构介绍。
Publisher,生产者,发布消息到RabbitMQ中的 Exchange 。
Consumer ,消费者,监听RabbitMQ中的Queue中的消息。
Exchange,交换机,和生产者建立连接并接收生产者的消息。
Queue,队列,Exchange会将消息分发到指定的 Queue,Queue和消费者进行交互。
Routes,路由,交换机以什么样的策略将消息发布到 Queue。
maven 项目中使用。

Hello-World。一方发消息,一方收消息。
// pom.xml<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.6.0</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency>
// RabbitConfig.javapublic class RabbitConfig {public static Connection getConnection() {ConnectionFactory factory = new ConnectionFactory();factory.setHost("10.36.144.157");factory.setPort(5672);factory.setUsername("test");factory.setPassword("test");factory.setVirtualHost("/virtual2");Connection connection = null;try {connection = factory.newConnection();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}return connection;}}
//public class Consume {@Testpublic void testConsume() throws IOException, TimeoutException {// 消费者Connection connection = RabbitConfig.getConnection();Channel channel = connection.createChannel();String queue = "queue";/*指定队列的名称当前队列是否需要持久化是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)如果这个队列没有消费者在消费,队列自动删除指定当前队列的其他信息*/channel.queueDeclare(queue, true, false, false, null);// 监听DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收: " + new String(body, "UTF-8"));}};/*指定消费哪个队列指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)指定消费回调*/channel.basicConsume(queue, true, consumer);// 一直运行System.in.read();channel.close();connection.close();}}//public class Publish {@Testpublic void testPublish() throws IOException, TimeoutException {// 生产者Connection connection = RabbitConfig.getConnection();Channel channel = connection.createChannel();String queue = "queue";String msg = "消息!";/*指定exchange指定路由的规则, 使用具体的队列名称指定传递的消息所携带的properties指定发布的具体消息*/channel.basicPublish("", queue, null, msg.getBytes());System.out.println("生产者发布消息成功");channel.close();connection.close();}}
Work。根据能力接收消息的多少。
//public class Consume1 {@Testpublic void testConsume() throws IOException, TimeoutException {// 消费者Connection connection = RabbitConfig.getConnection();Channel channel = connection.createChannel();// 指定当前消费者,一次消费多少个消息channel.basicQos(1);String queue = "queue";/*指定队列的名称当前队列是否需要持久化是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)如果这个队列没有消费者在消费,队列自动删除指定当前队列的其他信息*/channel.queueDeclare(queue, true, false, false, null);// 监听DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}/*接收: 消息! 0*/System.out.println("接收: " + new String(body, "UTF-8"));// 手动 ackchannel.basicAck(envelope.getDeliveryTag(), false);}};/*指定消费哪个队列指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)指定消费回调*/channel.basicConsume(queue, false, consumer);// 一直运行System.in.read();channel.close();connection.close();}}//public class Consume2 {@Testpublic void testConsume() throws IOException, TimeoutException {// 消费者Connection connection = RabbitConfig.getConnection();Channel channel = connection.createChannel();channel.basicQos(1);String queue = "queue";/*指定队列的名称当前队列是否需要持久化是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)如果这个队列没有消费者在消费,队列自动删除指定当前队列的其他信息*/channel.queueDeclare(queue, true, false, false, null);// 监听DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {/*接收: 消息! 1接收: 消息! 2接收: 消息! 3接收: 消息! 4接收: 消息! 5接收: 消息! 6接收: 消息! 7接收: 消息! 8接收: 消息! 9*/System.out.println("接收: " + new String(body, "UTF-8"));// 手动 ackchannel.basicAck(envelope.getDeliveryTag(), false);}};/*指定消费哪个队列指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)指定消费回调*/channel.basicConsume(queue, false, consumer);// 一直运行System.in.read();channel.close();connection.close();}}//public class Publish {@Testpublic void testPublish() throws IOException, TimeoutException {// 生产者Connection connection = RabbitConfig.getConnection();Channel channel = connection.createChannel();String queue = "queue";/*指定exchange指定路由的规则, 使用具体的队列名称指定传递的消息所携带的properties指定发布的具体消息*/for (int i = 0; i < 10; i++) {String msg = "消息! " + i;channel.basicPublish("", queue, null, msg.getBytes());}System.out.println("生产者发布消息成功");channel.close();connection.close();}}
Publish/Subscribe。向绑定的所有队列发同样的消息。
//public class Consume1 {@Testpublic void testConsume() throws IOException, TimeoutException {// 消费者Connection connection = RabbitConfig.getConnection();Channel channel = connection.createChannel();// 指定当前消费者,一次消费多少个消息channel.basicQos(1);String queue1 = "queue1";/*指定队列的名称当前队列是否需要持久化是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)如果这个队列没有消费者在消费,队列自动删除指定当前队列的其他信息*/channel.queueDeclare(queue1, true, false, false, null);// 监听DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}/*接收: 消息! 0接收: 消息! 1接收: 消息! 2接收: 消息! 3接收: 消息! 4接收: 消息! 5接收: 消息! 6接收: 消息! 7接收: 消息! 8接收: 消息! 9*/System.out.println("接收: " + new String(body, "UTF-8"));// 手动 ackchannel.basicAck(envelope.getDeliveryTag(), false);}};/*指定消费哪个队列指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)指定消费回调*/channel.basicConsume(queue1, false, consumer);// 一直运行System.in.read();channel.close();connection.close();}}//public class Consume2 {@Testpublic void testConsume() throws IOException, TimeoutException {// 消费者Connection connection = RabbitConfig.getConnection();Channel channel = connection.createChannel();String queue2 = "queue2";/*指定队列的名称当前队列是否需要持久化是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)如果这个队列没有消费者在消费,队列自动删除指定当前队列的其他信息*/channel.queueDeclare(queue2, true, false, false, null);// 监听DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {/*接收: 消息! 0接收: 消息! 1接收: 消息! 2接收: 消息! 3接收: 消息! 4接收: 消息! 5接收: 消息! 6接收: 消息! 7接收: 消息! 8接收: 消息! 9*/System.out.println("接收: " + new String(body, "UTF-8"));// 手动 ackchannel.basicAck(envelope.getDeliveryTag(), false);}};/*指定消费哪个队列指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)指定消费回调*/channel.basicConsume(queue2, false, consumer);// 一直运行System.in.read();channel.close();connection.close();}}//public class Publish {@Testpublic void testPublish() throws IOException, TimeoutException {// 生产者Connection connection = RabbitConfig.getConnection();Channel channel = connection.createChannel();String queue1 = "queue1";String queue2 = "queue2";String exchange = "exchange";// 创建 交换机channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT);// 绑定队列channel.queueBind(queue1, exchange, "");channel.queueBind(queue2, exchange, "");/*指定exchange指定路由的规则, 使用具体的队列名称指定传递的消息所携带的properties指定发布的具体消息*/for (int i = 0; i < 10; i++) {String msg = "消息! " + i;channel.basicPublish(exchange, "", null, msg.getBytes());}System.out.println("生产者发布消息成功");channel.close();connection.close();}}
Routing。向 RoutingKey 一致的队列发消息。
//public class Consume1 {@Testpublic void testConsume() throws IOException, TimeoutException {// 消费者Connection connection = RabbitConfig.getConnection();Channel channel = connection.createChannel();// 指定当前消费者,一次消费多少个消息channel.basicQos(1);String queue1 = "queue1";/*指定队列的名称当前队列是否需要持久化是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)如果这个队列没有消费者在消费,队列自动删除指定当前队列的其他信息*/channel.queueDeclare(queue1, true, false, false, null);// 监听DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}/*接收: routingKey1*/System.out.println("接收: " + new String(body, "UTF-8"));// 手动 ackchannel.basicAck(envelope.getDeliveryTag(), false);}};/*指定消费哪个队列指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)指定消费回调*/channel.basicConsume(queue1, false, consumer);// 一直运行System.in.read();channel.close();connection.close();}}//public class Consume2 {@Testpublic void testConsume() throws IOException, TimeoutException {// 消费者Connection connection = RabbitConfig.getConnection();Channel channel = connection.createChannel();String queue2 = "queue2";String exchange = "exchange";// 创建 交换机channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT);/*指定队列的名称当前队列是否需要持久化是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)如果这个队列没有消费者在消费,队列自动删除指定当前队列的其他信息*/channel.queueDeclare(queue2, true, false, false, null);// 监听DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {/*接收: routingKey2接收: routingKey2接收: routingKey2*/System.out.println("接收: " + new String(body, "UTF-8"));// 手动 ackchannel.basicAck(envelope.getDeliveryTag(), false);}};/*指定消费哪个队列指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)指定消费回调*/channel.basicConsume(queue2, false, consumer);// 一直运行System.in.read();channel.close();connection.close();}}//public class Publish {@Testpublic void testPublish() throws IOException, TimeoutException {// 生产者Connection connection = RabbitConfig.getConnection();Channel channel = connection.createChannel();String queue1 = "queue1";String queue2 = "queue2";String exchange2 = "exchange2";String routingKey1 = "r1";String routingKey2 = "r2";//channel.exchangeDeclare(exchange2, BuiltinExchangeType.DIRECT);// 绑定队列channel.queueBind(queue1, exchange2, routingKey1);channel.queueBind(queue2, exchange2, routingKey2);/*指定exchange指定路由的规则, 使用具体的队列名称指定传递的消息所携带的properties指定发布的具体消息*/channel.basicPublish(exchange2, routingKey1, null, "routingKey1".getBytes());channel.basicPublish(exchange2, routingKey2, null, "routingKey2".getBytes());channel.basicPublish(exchange2, routingKey2, null, "routingKey2".getBytes());channel.basicPublish(exchange2, routingKey2, null, "routingKey2".getBytes());System.out.println("生产者发布消息成功");channel.close();connection.close();}}
Topic。RoutingKey 改为通配符,* 代表一个 xxx,# 代表多个 xxx.xxx。
//public class Consume1 {@Testpublic void testConsume() throws IOException, TimeoutException {// 消费者Connection connection = RabbitConfig.getConnection();Channel channel = connection.createChannel();// 指定当前消费者,一次消费多少个消息channel.basicQos(1);String queue1 = "queue1";/*指定队列的名称当前队列是否需要持久化是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)如果这个队列没有消费者在消费,队列自动删除指定当前队列的其他信息*/channel.queueDeclare(queue1, true, false, false, null);// 监听DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}/*接收: 快红狗*/System.out.println("接收: " + new String(body, "UTF-8"));// 手动 ackchannel.basicAck(envelope.getDeliveryTag(), false);}};/*指定消费哪个队列指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)指定消费回调*/channel.basicConsume(queue1, false, consumer);// 一直运行System.in.read();channel.close();connection.close();}}//public class Consume2 {@Testpublic void testConsume() throws IOException, TimeoutException {// 消费者Connection connection = RabbitConfig.getConnection();Channel channel = connection.createChannel();String queue2 = "queue2";String exchange = "exchange";// 创建 交换机channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT);/*指定队列的名称当前队列是否需要持久化是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)如果这个队列没有消费者在消费,队列自动删除指定当前队列的其他信息*/channel.queueDeclare(queue2, true, false, false, null);// 监听DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {/*接收: 快红狗接收: 快黄猫接收: 慢黄兔*/System.out.println("接收: " + new String(body, "UTF-8"));// 手动 ackchannel.basicAck(envelope.getDeliveryTag(), false);}};/*指定消费哪个队列指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)指定消费回调*/channel.basicConsume(queue2, false, consumer);// 一直运行System.in.read();channel.close();connection.close();}}//public class Publish {@Testpublic void testPublish() throws IOException, TimeoutException {// 生产者Connection connection = RabbitConfig.getConnection();Channel channel = connection.createChannel();String queue1 = "queue1";String queue2 = "queue2";String exchange4 = "exchange4";String routingKey1 = "r1";String routingKey2 = "r2";//channel.exchangeDeclare(exchange4, BuiltinExchangeType.TOPIC);// 绑定队列channel.queueBind(queue1, exchange4, "*.red.*");channel.queueBind(queue2, exchange4, "fast.#");channel.queueBind(queue2, exchange4, "*.*.rabbit");/*指定exchange指定路由的规则, 使用具体的队列名称指定传递的消息所携带的properties指定发布的具体消息*/channel.basicPublish(exchange4, "fast.red.dog", null, "快红狗".getBytes());channel.basicPublish(exchange4, "fast.yellow.cat", null, "快黄猫".getBytes());channel.basicPublish(exchange4, "slow.yellow.rabbit", null, "慢黄兔".getBytes());System.out.println("生产者发布消息成功");channel.close();connection.close();}}
SpringBoot 整合 RabbitMQ。

// pom.xml<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.6.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>// application.ymlspring:rabbitmq:host: 10.36.144.157port: 5672username: testpassword: testvirtual-host: /virtual2listener:simple:acknowledge-mode: manual #设置为手动ackpublisher-confirms: true #开启confimr确认机制publisher-returns: true #开启return机制
// RabbitConfig.java@Configurationpublic class RabbitConfig {@Beanpublic TopicExchange topicExchange() {return new TopicExchange("boot-mq-exchange1", true, false);}@Beanpublic Queue queue() {return new Queue("boot-mq-queue", true, false, false);}@Beanpublic Binding binding(TopicExchange topicExchange, Queue queue) {return BindingBuilder.bind(queue).to(topicExchange).with("*.red.*");}}
// RabbitMQListener.java@Componentpublic class RabbitMQListener {// 手动ack@RabbitListener(queues = "boot-mq-queue")public void receive(String msg, Channel channel, Message message) throws IOException {System.out.println("接收: " + msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}
// 测试@SpringBootTest@RunWith(SpringRunner.class)public class DemoApplicationTests {// 框架注册@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void contextLoads() {rabbitTemplate.convertAndSend("boot-mq-exchange1","fast.red.dog","快红狗");}}
文章转载自java小小小小栈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




