
这篇文章带你了解怎么整合RabbitMQ服务器,并且使用四种方式,通过它怎么去发送和接收消息。
一. 准备工作
1. 安装rabbitmq 如果你还没接触 可以看我这两篇文章 在我的网页博客里
www.lhdyx.cn
二.构建工程
1.构架一个SpringBoot工程,其pom文件依赖加上spring-boot-starter-amqp的起步依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.在application.properties配置mq
#rabbitmq
spring.rabbitmq.host=localhost
#15672 是管理端端口
spring.rabbitmq.port=5672
#用户账号
spring.rabbitmq.username=guest
# 用户密码
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
# 消息发送到交换机确认机制,是否确认回调
spring.rabbitmq.publisher-confirms=true
# 消费者数量
spring.rabbitmq.listener.simple.concurrency= 10
spring.rabbitmq.listener.simple.max-concurrency= 10
# 消费者每次从队列获取的消息数量
spring.rabbitmq.listener.simple.prefetch= 1
# 消费者自动启动
spring.rabbitmq.listener.simple.auto-startup=true
# 消费失败,自动重新入队
spring.rabbitmq.listener.simple.default-requeue-rejected= true
# 启用发送重试
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=1000ms
spring.rabbitmq.template.retry.max-attempts=3
spring.rabbitmq.template.retry.max-interval=10000ms
spring.rabbitmq.template.retry.multiplier=1.0
3.创建MQ配置类
package com.li.springbootrabbitmq.rabbitmq;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @author lihaodong
**/
@Configuration
public class MqConfig {
// 测试队列
static final String QUEUE = "queue";
// Topic 交换机模式队列
static final String TOPIC_QUEUE1 = "topic.queue1";
static final String TOPIC_QUEUE2 = "topic.queue2";
static final String TOPIC_EXCHANGE = "topicExchange";
// Fanout模式队列
static final String FANOUT_QUEUE1 = "fanout.queue1";
static final String FANOUT_QUEUE2 = "fanout.queue2";
static final String FANOUT_EXCHANGE = "fanoutExchange";
// Header模式队列
static final String HEADERS_QUEUE = "headers.queue";
static final String HEADERS_EXCHANGE = "headersExchange";
/**
* Direct 交换机模式
*/
//队列
@Bean
public Queue queue() {
return new Queue(QUEUE,true);
}
/**
* Topic 交换机模式
*/
@Bean
public Queue topicQueue1(){
return new Queue(TOPIC_QUEUE1,true);
}
@Bean
public Queue topicQueue2(){
return new Queue(TOPIC_QUEUE2,true);
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(TOPIC_EXCHANGE);
}
/**
* 绑定Exchange和queue 正确地将消息路由到指定的Queue
*/
@Bean
public Binding topicBinding1(){
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.key1");
}
// #通配符,代表多个单词
@Bean
public Binding topicBinding2(){
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.#");
}
/**
* Fanout模式 交换机Exchange
*/
@Bean
public Queue fanoutQueue1(){
return new Queue(FANOUT_QUEUE1,true);
}
@Bean
public Queue fanoutQueue2(){
return new Queue(FANOUT_QUEUE2,true);
}
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange(FANOUT_EXCHANGE);
}
@Bean
public Binding fanoutBinding1(){
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
@Bean
public Binding fanoutBinding2(){
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
/**
* Header模式 交换机Exchange
*/
@Bean
public HeadersExchange headersExchange(){
return new HeadersExchange(HEADERS_EXCHANGE);
}
@Bean
public Queue headersQueue(){
return new Queue(HEADERS_QUEUE,true);
}
@Bean
public Binding headersBinding(){
Map<String, Object> map = new HashMap<>();
map.put("header1","value1");
map.put("header2","value2");
return BindingBuilder.bind(headersQueue()).to(headersExchange()).whereAll(map).match();
}
}
4.创建发送者
package com.li.springbootrabbitmq.rabbitmq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @author lihaodong
**/
@Service
public class MqSender {
private Logger logger = LoggerFactory.getLogger(MqSender.class);
@Autowired
private AmqpTemplate amqpTemplate;
/**
* Direct 交换机模式
*/
public void send(Object message){
logger.info("send topic message: " + message);
amqpTemplate.convertAndSend(MqConfig.QUEUE, message);
}
/**
* Topic 交换机模式
*/
public void sendTopic(Object message){
logger.info("send topic message: " + message);
amqpTemplate.convertAndSend(MqConfig.TOPIC_EXCHANGE,"topic.key1",message+"1");
amqpTemplate.convertAndSend(MqConfig.TOPIC_EXCHANGE,"topic.key2",message+"2");
}
/**
* Fanout模式 交换机Exchange
*/
public void sendFanout(Object message){
logger.info("send fanout message: " + message);
amqpTemplate.convertAndSend(MqConfig.FANOUT_EXCHANGE,"",message+"1");
}
/**
* Header模式 交换机Exchange
*"header1","value1"要与队列初始化的时候一样
*/
public void sendHeaders(Object message){
logger.info("send headers message: " + message);
MessageProperties properties = new MessageProperties();
properties.setHeader("header1","value1");
properties.setHeader("header2","value2");
Message obj = new Message(message.toString().getBytes(),properties);
amqpTemplate.convertAndSend(MqConfig.HEADERS_EXCHANGE,"",obj);
}
}
5.创建接收者
package com.li.springbootrabbitmq.rabbitmq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
* @author lihaodong
**/
@Service
public class MqReceiver {
private Logger logger = LoggerFactory.getLogger(MqReceiver.class);
/**
* Direct 交换机模式
*/
@RabbitListener(queues = MqConfig.QUEUE)
public void receive(String message){
logger.info("receive message" + message);
}
/**
* Topic 交换机模式
*/
@RabbitListener(queues = MqConfig.TOPIC_QUEUE1)
public void receiveTopic1(String message){
logger.info("receive topic queue1 message: " + message);
}
@RabbitListener(queues = MqConfig.TOPIC_QUEUE2)
public void receiveTopic2(String message){
logger.info("receive topic queue2 message: " + message);
}
/**
* Fanout模式 交换机Exchange
*/
@RabbitListener(queues = MqConfig.FANOUT_QUEUE1)
public void receiveFanout1(String message){
logger.info("receive fanout queue1 message: " + message);
}
@RabbitListener(queues = MqConfig.FANOUT_QUEUE2)
public void receiveFanout2(String message){
logger.info("receive fanout queue2 message: " + message);
}
/**
* Header模式 交换机Exchange
*/
@RabbitListener(queues = MqConfig.HEADERS_QUEUE)
public void receiveFanout2(byte[] message){
logger.info("receive headers queue message: " + new String(message));
}
}
6.单元测试类
package com.li.springbootrabbitmq.test;
import com.li.springbootrabbitmq.rabbitmq.MqSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @ClassName MqTest
* @Author lihaodong
* @Date 2019/2/24 20:54
* @Mail lihaodongmail@163.com
* @Description mq测试
* @Version 1.0
**/
@RunWith(SpringRunner.class)
@SpringBootTest
public class MqTest {
@Autowired
private MqSender mqSender;
@Test
public void mq() {
mqSender.send("LiHaodong you're the hero,李浩东是成为海贼王的男人!");
}
@Test
public void mq01() {
mqSender.sendTopic("LiHaodong you're the hero,李浩东是成为海贼王的男人!");
}
@Test
public void mq02() {
mqSender.sendFanout("LiHaodong you're the hero,李浩东是成为海贼王的男人!");
}
@Test
public void mq03() {
mqSender.sendHeaders("LiHaodong you're the hero,李浩东是成为海贼王的男人!");
}
}
按照顺序启动测试程序,控制台打印:
源码下载:https://github.com/LiHaodong888/SpringBootLearn




