暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

RabbitMQ六种队列模式之路由模式

黎明大大 2021-04-07
2051
点击“蓝字”关注我们吧


前言



本文接着带大家伙了解RabbitMQ队列模式中的路由模式,其实只要看过我前面写的发布订阅模式的文章后,相信路由模式上手就非常 easy 了,唯一差距就是两个参数,exchange类型和 routingKey 。


路由模式



什么是路由模式
路由模式跟发布订阅模式类似,然后在订阅模式的基础上加上了类型,订阅模式是分发到所有绑定到交换机的队列,路由模式只分发到绑定在交换机上面指定路由键的队列


功能介绍
图解:
  • P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key

  • X:Exchange(交换机),接受生产者的消息,然后把消息递交给与 routing key 完全匹配的队列

  • 红色方块:队列,用于存放消息

  • C1:消费者,其所在队列指定了需要 routing key 为 error的消息

  • C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning的消息


Routing 路由模式
路由模式的交换机类型是Direct,Direct交换机的特点,就决定了路由模式的工作模式,即只有消息的 Routing key 与Binding key 相同时,交换机才会把消息发给该队列。


代码演示

本文是基于SpringBoot框架去集成的RabbitMQ,所以最好会SpringBoot基础,再跟着本文一起搭建路由队列Demo


创建一个简单的maven项目


导入依赖
首先在我的父工程 pom.xml 导入maven依赖
    <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.0.RELEASE</version>
    </parent>
    <dependencies>

    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.8</version>
    </dependency>
    </dependencies>


    生产者
    生产者项目结构


    pom文件
      <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-web</artifactId>
      </dependency>

      yml文件
        server:
         port: 8081
        spring:
         rabbitmq:
           ####连接地址
           host: 192.168.137.5
           ####端口号
           port: 5672
           ####账号
           username: sunny
           ####密码
           password: sunny
           ### 交换机
           virtual-host: /sunny_vm

        生产者配置类
          @Configuration
          public class RabbbitMqConfig {




          public static final String EXCHANGE_NAME = "sunny_routingkey_exchange";


          /**
          * SMS队列名称
          */
          public static final String SMS_QUEUE_NAME = "sms_direct_routingkey_queue";


          /**
          * email队列名称
          */
          public static final String EMAIL_QUEUE_NAME = "email_direct_routingkey_queue";




          /**
          * sms 路由key名称
          */
          public static final String SMS_ROUTING_KEY = "sms_routing_key";


          /**
          * email 路由key名称
          */
          public static final String EMAIL_ROUTING_KEY = "email_routing_key";


          /**
          * 声明短信队列
          *
          * @return
          */
          @Bean
          public Queue smsQueue() {
          return new Queue(SMS_QUEUE_NAME, true);
          }


          /**
          * 声明email队列
          *
          * @return
          */
          @Bean
          public Queue emailQueue() {
          return new Queue(EMAIL_QUEUE_NAME, true);
          }


          /**
          * 声明一个Direct类型的交换机
          *
          * @return
          */
          @Bean
          public DirectExchange directExchange() {
          return new DirectExchange(EXCHANGE_NAME);
          }


          /**
          * 将上面的sms队列绑定到Direct交换机
          *
          * @param smsQueue
          * @param directExchange
          * @return
          */
          @Bean
          public Binding smsQueueDirectExchange(Queue smsQueue, DirectExchange directExchange) {
          return BindingBuilder.bind(smsQueue).to(directExchange).with(SMS_ROUTING_KEY);
          }


          /**
          * 将上面的email队列绑定到Direct交换机
          *
          * @param emailQueue
          * @param directExchange
          * @return
          */
          @Bean
          public Binding emailQueueDirectExchange(Queue emailQueue, DirectExchange directExchange) {
          return BindingBuilder.bind(emailQueue).to(directExchange).with(EMAIL_ROUTING_KEY);
          }
          }


          用户实体类
            @Data
            public class UserEntity implements Serializable {


            private Integer id;
            private String name;
            }

            生产者发送消息
              @RestController
              public class SmsProducerController {


              @Autowired
              private RabbitTemplate rabbitTemplate;


              @GetMapping("/send/sms")
              public void send() {


              UserEntity userEntity = new UserEntity();
              userEntity.setId(new Random().nextInt());
              userEntity.setName("短信:" + UUID.randomUUID().toString());
              rabbitTemplate.convertAndSend(RabbbitMqConfig.EXCHANGE_NAME, RabbbitMqConfig.SMS_ROUTING_KEY, userEntity);


              System.out.println("短信生产者消息发送成功!!!");
              }
              }


              @RestController
              public class EmailProducerController {


              @Autowired
              private RabbitTemplate rabbitTemplate;


              @GetMapping("/send/email")
              public void send() {


              UserEntity userEntity = new UserEntity();
              userEntity.setId(new Random().nextInt());
              userEntity.setName("邮件:" + UUID.randomUUID().toString());
              rabbitTemplate.convertAndSend(RabbbitMqConfig.EXCHANGE_NAME, RabbbitMqConfig.EMAIL_ROUTING_KEY, userEntity);


              System.out.println("邮件生产者消息发送成功!!!");
              }
              }


              生产者测试发送消息
              打开浏览器,访问指定网址
                http://localhost:8081/send/sms
                http://localhost:8081/send/email


                登陆Mangerment界面,可以看到我们在配置文件中配置的交换机名称,
                SpringBoot自动在RabbitMQ里面,已经帮我们创建好了,且交换机的类型为direct类型。


                我们还可以点击交换机的名称,然后看到交换机绑定队列的关系图等。


                然后可以看到,我绑定交换机的两个队列,分别都积压着消息没有被消费掉


                消费者
                消费者项目结构


                yml文件
                  server:
                  port: 8080


                  spring:
                  rabbitmq:
                  ####连接地址
                  host: 192.168.137.5
                  ####端口号
                  port: 5672
                  ####账号
                  username: sunny
                  ####密码
                  password: sunny
                  ### 交换机
                  virtual-host: /sunny_vm

                  用户实体类
                    @Data
                    public class UserEntity implements Serializable {


                    private Integer id;
                    private String name;
                    }

                    新建2个消费者,监听不同的队列
                      @Component
                      @RabbitListener(queues = {"email_direct_routingkey_queue"})
                      public class EmailConsumerController {
                      @RabbitHandler
                      public void one(UserEntity userEntity, Channel channel, Message message) {
                      System.out.println("邮件队列接收到消息了!!!" + userEntity.getName());
                      }
                      }




                      @Component
                      @RabbitListener(queues = {"sms_direct_routingkey_queue"})
                      public class SmsConsumerController {
                      @RabbitHandler
                      public void one(UserEntity userEntity, Channel channel, Message message) {
                      System.out.println("短信队列接收到消息了!!!" + userEntity.getName());
                      }
                      }


                      启动消费者项目,项目启动后会自动消费消息


                      队列中积压的消息被成功消费


                      到此SpringBoot整合RabbitMQ实现路由模式代码Demo就结束拉


                      总结

                      1、队列与交换机的绑定,不能是任意绑定了,而是指定一个 RoutingKey(路由key)

                      2、消息的发送方,向Exchange发送消息时,也必须指定消息的RoutingKey

                      3、Exchange 不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致,才会接受到消息


                      我是黎明大大,我知道我没有惊世的才华,也没有超于凡人的能力,但毕竟我还有一个不屈服,敢于选择向命运冲锋的灵魂,和一个就是伤痕累累也要义无反顾走下去的心。


                      如果您觉得本文对您有帮助,还请关注点赞一波,后期将不间断更新更多技术文章


                      扫描二维码关注我
                      不定期更新技术文章哦



                      RabbitMQ六种队列模式之发布订阅模式

                      RabbitMQ六种队列模式之工作队列模式

                      RabbitMQ六种队列模式之简单队列模式

                      我们所了解的Redis分布式锁真的就万无一失吗?

                      深入理解Redis的持久化机制

                      Spring5.0源码深度解析之Spring是如何利用三级缓存解决循环依赖的问题



                      发现“在看”和“赞”了吗,因为你的点赞,让我元气满满哦
                      文章转载自黎明大大,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                      评论