此文内容若含有机密信息,请不要擅自使用相关信息或转发给任何人,不然需要负法律责任,并请告知相关人及时删除并修改机密信息,谢谢。
之前写过.NET的C#版本的RabbitMQ的使用,如下,大家也可以看看
下面只讲解部分功能的使用,几种模式的使用步骤差不多。
官网
https://www.rabbitmq.com/getstarted.html
RabbitMQ模式有如下几种:



目前大家常用且熟知的可能是前五种,即简单模式、Work模式、发布/订阅模式、路由模型、Topic模式(通配符模式)
我们新建一个Springboot项目,pom.xml文件引入以下包
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>com.ctrip.framework.apollo</groupId><artifactId>apollo-client</artifactId><version>1.7.0</version></dependency></dependencies>
配置文件设置应用端口
server:port: 8018
目录结构如下:

1-Work模式

(1)在config目录下新建RabbitmqConfig文件,代码如下
@SpringBootConfigurationpublic class RabbitmqConfig {//region 配置一个工作模型队列@Beanpublic Queue queueWork() {return new Queue("work_queue");}
(2)在listener目录下新建WorkReceiveListener文件,内容如下
@Componentpublic class WorkReceiveListener {@RabbitListener(queues = "work_queue")public void receiveMessage(String msg, Channel channel, Message message) {// 只包含发送的消息System.out.println("work模式接收者1接收到消息:" + msg);// channel 通道信息// message 附加的参数信息}@RabbitListener(queues = "work_queue")public void receiveMessage2(Object obj, Channel channel, Message message) {// 包含所有的信息System.out.println("work模式接收者2接收到消息:" + obj);}}
(3)在mapper下新建RabbitmqMapper文件,内容如下:
@Componentpublic class RabbitmqMapper {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendWork() {for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend("work_queue", "测试work: " + i);}}
(4)新建controller层新建文件RabbitmqController
@RestControllerpublic class RabbitmqController {@Autowiredprivate RabbitmqService rabbitmqService;//http://127.0.0.1:8018/sendWork@RequestMapping("/sendWork")public Object sendWork() {rabbitmqService.sendWork();return "发送成功...";}
(5)在service新增
public interface RabbitmqService {void sendWork();}
@Servicepublic class RabbitmqServiceImpl implements RabbitmqService {@Autowiredprivate RabbitmqMapper rabbitmqMapper;@Overridepublic void sendWork() {rabbitmqMapper.sendWork();}}
(6)在配置文件中加入Rabbitmq配置
# 应用名spring.application.name=springboot-rabbitmqrabbitmq配置信息ipspring.rabbitmq.host=172.17.17.17# 端口spring.rabbitmq.port=5672# 用户名spring.rabbitmq.username=admin# 密码spring.rabbitmq.password=admin# 配置虚拟机spring.rabbitmq.virtual-host=/# 消息开启手动确认spring.rabbitmq.listener.direct.acknowledge-mode=manual
访问接口
http://localhost:8018/sendWork
其他模式新建的方法步骤都类似,如下
2-发布/订阅模式

(1)在config目录下新建RabbitmqConfig文件,代码如下
//region 发布订阅模式// 声明两个队列@Beanpublic Queue queueFanout1() {return new Queue("fanout_queue_1");}@Beanpublic Queue queueFanout2() {return new Queue("fanout_queue_2");}// 准备一个交换机@Beanpublic FanoutExchange exchangeFanout() {return new FanoutExchange("fanout_exchange");}// 将交换机和队列进行绑定@Beanpublic Binding bindingExchange1() {return BindingBuilder.bind(queueFanout1()).to(exchangeFanout());}@Beanpublic Binding bindingExchange2() {return BindingBuilder.bind(queueFanout2()).to(exchangeFanout());}//endregion
(2)在listener目录下新建PublishReceiveListener文件,内容如下
@Componentpublic class PublishReceiveListener {@RabbitListener(queues = "fanout_queue_1")public void receiveMsg1(String msg) {System.out.println("fanout队列1接收到消息:" + msg);}@RabbitListener(queues = "fanout_queue_2")public void receiveMsg2(String msg) {System.out.println("fanout队列2接收到消息:" + msg);}}
(3)在mapper下新建RabbitmqMapper文件,内容如下:
// 向发布订阅模式里面发送消息public void sendPublish() {for (int i = 0; i < 5; i++) {// rabbitTemplate.convertSendAndReceive("exchange_fanout", "", "测试发布订阅:" + i);rabbitTemplate.convertAndSend("fanout_exchange", "", "测试发布订阅:" + i);}}
(4)新建controller层新建文件RabbitmqController
//http://127.0.0.1:8018/sendPublish@RequestMapping("/sendPublish")public String sendPublish() {rabbitmqService.sendPublish();return "发送成功...";}
(5)在service新增
void sendPublish();
@Overridepublic void sendPublish() {rabbitmqMapper.sendPublish();}
访问接口
http://localhost:8018/sendPublish
3-Topic模式

(1)在config目录下新建RabbitmqConfig文件,代码如下
//region topic 模型@Beanpublic Queue queueTopic1() {return new Queue("topic_queue_1");}@Beanpublic Queue queueTopic2() {return new Queue("topic_queue_2");}@Beanpublic TopicExchange exchangeTopic() {return new TopicExchange("topic_exchange");}@Beanpublic Binding bindingTopic1() {return BindingBuilder.bind(queueTopic1()).to(exchangeTopic()).with("topic.#");}@Beanpublic Binding bindingTopic2() {return BindingBuilder.bind(queueTopic2()).to(exchangeTopic()).with("topic.*");}//endregion
(2)在listener目录下新建TopicReceiveListener文件,内容如下
@Componentpublic class TopicReceiveListener {@RabbitListener(queues = "topic_queue_1")public void receiveMsg1(String msg) {System.out.println("topic消费者1接收到:" + msg);}@RabbitListener(queues = "topic_queue_2")public void receiveMsg2(String msg) {System.out.println("topic消费者2接收到:" + msg);}}
(3)在mapper下新建RabbitmqMapper文件,内容如下:
// 向topic发送数据public void sendTopic() {for (int i = 0; i < 10; i++) {if (i % 2 == 0) {rabbitTemplate.convertSendAndReceive("topic_exchange", "topic.route.topic", "测试发布订阅:" + i);} else {rabbitTemplate.convertSendAndReceive("topic_exchange", "topic.route", "测试发布订阅:" + i);}}}
(4)新建controller层新建文件RabbitmqController
@RequestMapping("/sendTopic")public String sendTopic() {rabbitmqService.sendTopic();return "发送成功...";}
(5)在service新增
void sendTopic();
@Overridepublic void sendTopic() {rabbitmqMapper.sendTopic();}
访问接口
http://localhost:8018/sendTopic
4-延迟队列
通常,延迟队列是有时间限制的,当到期后,通常会将消息放到另一个队列中。
(1)在config目录下新建RabbitmqConfig文件,代码如下
//region TTL 队列 死信队列@Value("${ttl_direct_product.user.exchange}")private String userExchange;@Value("${ttl_direct_product.user.queue}")private String userQueue;@Value("${ttl_direct_product.user.routingKey}")private String userRoutingKey;@Value("${ttl_direct_product.dlx.exchange}")private String dlxExchange;@Value("${ttl_direct_product.dlx.queue}")private String dlxQueue;@Value("${ttl_direct_product.dlx.routingKey}")private String dlxRoutingKey;/*** 声明死信队列* @return DirectExchange*/@Beanpublic DirectExchange dlxExchange() {return new DirectExchange(dlxExchange);}/*** 声明死信队列* @return Queue*/@Beanpublic Queue dlxQueue() {return new Queue(dlxQueue);}/*** 绑定死信队列到死信交换机* @return Binding*/@Beanpublic Binding binding() {return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(dlxRoutingKey);}/*** 声明订单业务交换机* @return DirectExchange*/@Beanpublic DirectExchange userExchange() {return new DirectExchange(userExchange);}/*** 声明订单业务队列* @return Queue*/@Beanpublic Queue userQueue() {Map<String,Object> arguments = new HashMap<>(2);// 绑定该队列到私信交换机arguments.put("x-dead-letter-exchange",dlxExchange);arguments.put("x-dead-letter-routing-key",dlxRoutingKey);return new Queue(userQueue,true,false,false,arguments);}/*** 绑定订单队列到订单交换机* @return Binding*/@Beanpublic Binding userBinding() {return BindingBuilder.bind(userQueue()).to(userExchange()).with(userRoutingKey);}//endregion
(2)在listener目录下新建DirectTTLReceiveListener文件,内容如下
@Componentpublic class DirectTTLReceiveListener {@RabbitListener(queues = "ttl_direct_product.dlx.queue")public void receiveMsg1(String msg) {System.out.println("ttl_direct_product.dlx.queue接收到消息:" + msg);}}
(3)在mapper下新建RabbitmqMapper文件,内容如下:
// 向发布订阅模式里面发送消息public void sendDirectTTL() {for (int i = 0; i < 5; i++) {// rabbitTemplate.convertSendAndReceive("exchange_fanout", "", "测试发布订阅:" + i);rabbitTemplate.convertAndSend("ttl_direct_product.user.exchange", "ttl_direct_product.user.routingKey","测试死信队列消息"+i, message -> {message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);message.getMessageProperties().setExpiration("10000");return message;});}}
(4)新建controller层新建文件RabbitmqController
@RequestMapping("/sendDirectTTL")public String sendDirectTTL() {rabbitmqService.sendDirectTTL();return "发送成功...";}
(5)在service新增
void sendDirectTTL();
@Overridepublic void sendDirectTTL() {rabbitmqMapper.sendDirectTTL();}
访问接口
http://localhost:8018/sendDirectTTL
5-将RabbitMQ配置放到Apollo配置中心
Apollo的安装这里就不说明了,直接讲解Apollo的配置
(1)新建一个应用

(2)填写如下内容,这里主要注意的是Appid的名字,会与应用代码中的配置有关联,命名尽量正常点,不要另类,我这里名字叫springboot-apollo-demo6

(3)

(4)填写以下内容到输入框中
spring.rabbitmq.host = 172.17.17.17spring.rabbitmq.port = 5672spring.rabbitmq.username = adminspring.rabbitmq.password = adminspring.rabbitmq.virtual-host = /spring.rabbitmq.listener.direct.acknowledge-mode = manual

勾选确定
(5)进入上面的应用代码中
修改配置文件如下

新建一个配置 文件,名称为application-dev.yml,内容如下
server:
port: 8018
application:
name: springboot-rabbitmq
app:
id: springboot-apollo-demo6
apollo:
meta: http://172.17.17.47:8080
namespace: application
(6)再在启动程序中添加如下注入

这时,我们的RabbitMQ的配置就是从配置中心读取了

更多分享请关注我的公众号




