暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

SpringBoot详细教程 | 第十篇:Spring Boot整合RabbitMQ

小东IT技术分享 2019-02-24
1025

这篇文章带你了解怎么整合RabbitMQ服务器,并且使用四种方式,通过它怎么去发送和接收消息。

一. 准备工作

1. 安装rabbitmq 如果你还没接触 可以看我这两篇文章 在我的网页博客里

www.lhdyx.cn

二.构建工程

1.构架一个SpringBoot工程,其pom文件依赖加上spring-boot-starter-amqp的起步依赖:

  1.    <dependency>

  2.        <groupId>org.springframework.boot</groupId>

  3.        <artifactId>spring-boot-starter-amqp</artifactId>

  4.    </dependency>

2.在application.properties配置mq

  1. #rabbitmq

  2. spring.rabbitmq.host=localhost

  3. #15672 是管理端端口

  4. spring.rabbitmq.port=5672

  5. #用户账号

  6. spring.rabbitmq.username=guest

  7. # 用户密码

  8. spring.rabbitmq.password=guest

  9. spring.rabbitmq.virtual-host=/

  10. # 消息发送到交换机确认机制,是否确认回调

  11. spring.rabbitmq.publisher-confirms=true

  12. # 消费者数量

  13. spring.rabbitmq.listener.simple.concurrency= 10

  14. spring.rabbitmq.listener.simple.max-concurrency= 10

  15. # 消费者每次从队列获取的消息数量

  16. spring.rabbitmq.listener.simple.prefetch= 1

  17. # 消费者自动启动

  18. spring.rabbitmq.listener.simple.auto-startup=true

  19. # 消费失败,自动重新入队

  20. spring.rabbitmq.listener.simple.default-requeue-rejected= true

  21. # 启用发送重试

  22. spring.rabbitmq.template.retry.enabled=true

  23. spring.rabbitmq.template.retry.initial-interval=1000ms

  24. spring.rabbitmq.template.retry.max-attempts=3

  25. spring.rabbitmq.template.retry.max-interval=10000ms

  26. spring.rabbitmq.template.retry.multiplier=1.0

3.创建MQ配置类

  1. package com.li.springbootrabbitmq.rabbitmq;


  2. import org.springframework.amqp.core.*;

  3. import org.springframework.context.annotation.Bean;

  4. import org.springframework.context.annotation.Configuration;


  5. import java.util.HashMap;

  6. import java.util.Map;


  7. /**

  8. * @author lihaodong

  9. **/

  10. @Configuration

  11. public class MqConfig {


  12.    // 测试队列

  13.    static final String QUEUE = "queue";

  14.    // Topic 交换机模式队列

  15.    static final String TOPIC_QUEUE1 = "topic.queue1";

  16.    static final String TOPIC_QUEUE2 = "topic.queue2";

  17.    static final String TOPIC_EXCHANGE = "topicExchange";

  18.    // Fanout模式队列

  19.    static final String FANOUT_QUEUE1 = "fanout.queue1";

  20.    static final String FANOUT_QUEUE2 = "fanout.queue2";

  21.    static final String FANOUT_EXCHANGE = "fanoutExchange";

  22.    // Header模式队列

  23.    static final String HEADERS_QUEUE = "headers.queue";

  24.    static final String HEADERS_EXCHANGE = "headersExchange";



  25.    /**

  26.     * Direct 交换机模式

  27.     */

  28.    //队列

  29.    @Bean

  30.    public Queue queue() {

  31.        return new Queue(QUEUE,true);

  32.    }


  33.    /**

  34.     * Topic 交换机模式

  35.     */

  36.    @Bean

  37.    public Queue topicQueue1(){

  38.        return new Queue(TOPIC_QUEUE1,true);

  39.    }

  40.    @Bean

  41.    public Queue topicQueue2(){

  42.        return new Queue(TOPIC_QUEUE2,true);

  43.    }

  44.    @Bean

  45.    public TopicExchange topicExchange(){

  46.        return new TopicExchange(TOPIC_EXCHANGE);

  47.    }

  48.    /**

  49.     * 绑定Exchange和queue 正确地将消息路由到指定的Queue

  50.     */

  51.    @Bean

  52.    public Binding topicBinding1(){

  53.        return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.key1");

  54.    }

  55.    // #通配符,代表多个单词

  56.    @Bean

  57.    public Binding topicBinding2(){

  58.        return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.#");

  59.    }


  60.    /**

  61.     * Fanout模式 交换机Exchange

  62.     */

  63.    @Bean

  64.    public Queue fanoutQueue1(){

  65.        return new Queue(FANOUT_QUEUE1,true);

  66.    }

  67.    @Bean

  68.    public Queue fanoutQueue2(){

  69.        return new Queue(FANOUT_QUEUE2,true);

  70.    }


  71.    @Bean

  72.    public FanoutExchange fanoutExchange(){

  73.        return new FanoutExchange(FANOUT_EXCHANGE);

  74.    }


  75.    @Bean

  76.    public Binding fanoutBinding1(){

  77.        return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());

  78.    }


  79.    @Bean

  80.    public Binding fanoutBinding2(){

  81.        return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());

  82.    }


  83.    /**

  84.     * Header模式 交换机Exchange

  85.     */

  86.    @Bean

  87.    public HeadersExchange headersExchange(){

  88.        return new HeadersExchange(HEADERS_EXCHANGE);

  89.    }

  90.    @Bean

  91.    public Queue headersQueue(){

  92.        return new Queue(HEADERS_QUEUE,true);

  93.    }

  94.    @Bean

  95.    public Binding headersBinding(){

  96.        Map<String, Object> map = new HashMap<>();

  97.        map.put("header1","value1");

  98.        map.put("header2","value2");

  99.        return BindingBuilder.bind(headersQueue()).to(headersExchange()).whereAll(map).match();

  100.    }

  101. }

4.创建发送者

  1. package com.li.springbootrabbitmq.rabbitmq;


  2. import org.slf4j.Logger;

  3. import org.slf4j.LoggerFactory;

  4. import org.springframework.amqp.core.AmqpTemplate;

  5. import org.springframework.amqp.core.Message;

  6. import org.springframework.amqp.core.MessageProperties;

  7. import org.springframework.beans.factory.annotation.Autowired;

  8. import org.springframework.stereotype.Service;


  9. /**

  10. * @author lihaodong

  11. **/

  12. @Service

  13. public class MqSender {


  14.    private Logger logger = LoggerFactory.getLogger(MqSender.class);


  15.    @Autowired

  16.    private AmqpTemplate amqpTemplate;



  17.    /**

  18.     * Direct 交换机模式

  19.     */

  20.    public void send(Object message){

  21.        logger.info("send topic message: " + message);

  22.        amqpTemplate.convertAndSend(MqConfig.QUEUE, message);

  23.    }


  24.    /**

  25.     * Topic 交换机模式

  26.     */

  27.    public void sendTopic(Object message){

  28.        logger.info("send topic message: " + message);

  29.        amqpTemplate.convertAndSend(MqConfig.TOPIC_EXCHANGE,"topic.key1",message+"1");

  30.        amqpTemplate.convertAndSend(MqConfig.TOPIC_EXCHANGE,"topic.key2",message+"2");

  31.    }


  32.    /**

  33.     * Fanout模式 交换机Exchange

  34.     */

  35.    public void sendFanout(Object message){

  36.        logger.info("send fanout message: " + message);

  37.        amqpTemplate.convertAndSend(MqConfig.FANOUT_EXCHANGE,"",message+"1");

  38.    }


  39.    /**

  40.     * Header模式 交换机Exchange

  41.     *"header1","value1"要与队列初始化的时候一样

  42.     */

  43.    public void sendHeaders(Object message){

  44.        logger.info("send headers message: " + message);

  45.        MessageProperties properties = new MessageProperties();

  46.        properties.setHeader("header1","value1");

  47.        properties.setHeader("header2","value2");

  48.        Message obj = new Message(message.toString().getBytes(),properties);

  49.        amqpTemplate.convertAndSend(MqConfig.HEADERS_EXCHANGE,"",obj);

  50.    }

  51. }

5.创建接收者

  1. package com.li.springbootrabbitmq.rabbitmq;


  2. import org.slf4j.Logger;

  3. import org.slf4j.LoggerFactory;

  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;

  5. import org.springframework.stereotype.Service;


  6. /**

  7. * @author lihaodong

  8. **/


  9. @Service

  10. public class MqReceiver {


  11.    private Logger logger = LoggerFactory.getLogger(MqReceiver.class);


  12.    /**

  13.     * Direct 交换机模式

  14.     */

  15.    @RabbitListener(queues = MqConfig.QUEUE)

  16.    public void receive(String message){

  17.        logger.info("receive message" + message);

  18.    }


  19.    /**

  20.     * Topic 交换机模式

  21.     */

  22.    @RabbitListener(queues = MqConfig.TOPIC_QUEUE1)

  23.    public void receiveTopic1(String message){

  24.        logger.info("receive topic queue1 message: " + message);

  25.    }


  26.    @RabbitListener(queues = MqConfig.TOPIC_QUEUE2)

  27.    public void receiveTopic2(String message){

  28.        logger.info("receive topic queue2 message: " + message);

  29.    }


  30.    /**

  31.     * Fanout模式 交换机Exchange

  32.     */

  33.    @RabbitListener(queues = MqConfig.FANOUT_QUEUE1)

  34.    public void receiveFanout1(String message){

  35.        logger.info("receive fanout queue1 message: " + message);

  36.    }


  37.    @RabbitListener(queues = MqConfig.FANOUT_QUEUE2)

  38.    public void receiveFanout2(String message){

  39.        logger.info("receive fanout queue2 message: " + message);

  40.    }


  41.    /**

  42.     * Header模式 交换机Exchange

  43.     */

  44.    @RabbitListener(queues = MqConfig.HEADERS_QUEUE)

  45.    public void receiveFanout2(byte[] message){

  46.        logger.info("receive headers queue message: " + new String(message));

  47.    }


  48. }

6.单元测试类

  1. package com.li.springbootrabbitmq.test;


  2. import com.li.springbootrabbitmq.rabbitmq.MqSender;

  3. import org.junit.Test;

  4. import org.junit.runner.RunWith;

  5. import org.springframework.beans.factory.annotation.Autowired;

  6. import org.springframework.boot.test.context.SpringBootTest;

  7. import org.springframework.test.context.junit4.SpringRunner;


  8. /**

  9. * @ClassName MqTest

  10. * @Author lihaodong

  11. * @Date 2019/2/24 20:54

  12. * @Mail lihaodongmail@163.com

  13. * @Description mq测试

  14. * @Version 1.0

  15. **/

  16. @RunWith(SpringRunner.class)

  17. @SpringBootTest

  18. public class MqTest {


  19.    @Autowired

  20.    private MqSender mqSender;


  21.    @Test

  22.    public void mq() {

  23.        mqSender.send("LiHaodong you're the hero,李浩东是成为海贼王的男人!");

  24.    }


  25.    @Test

  26.    public void mq01() {

  27.        mqSender.sendTopic("LiHaodong you're the hero,李浩东是成为海贼王的男人!");

  28.    }

  29.    @Test

  30.    public void mq02() {

  31.        mqSender.sendFanout("LiHaodong you're the hero,李浩东是成为海贼王的男人!");

  32.    }


  33.    @Test

  34.    public void mq03() {

  35.        mqSender.sendHeaders("LiHaodong you're the hero,李浩东是成为海贼王的男人!");

  36.    }

  37. }

按照顺序启动测试程序,控制台打印:







源码下载:https://github.com/LiHaodong888/SpringBootLearn



文章转载自小东IT技术分享,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论