暂无图片
暂无图片
1
暂无图片
暂无图片
暂无图片

RabbitMQ的整体架构

原创 尹国斌 2020-08-21
967

MQ的整体架构

image.png
Exchange 交换机接收消息,根据路由键(RoutingKey)转发消息到绑定的队列

4.1、交换机属性:

  • name:交换机名称
  • Type:交换机类型direct、topic、fanout、headers
  • Durability:是否需要持久化,true为持久化
  • Auto Delete:当最后一个绑定到exchange上的队列删除后,自动删除该exchange
  • Internal:当前exchange是否用于Rabbitmq内部使用,默认false
  • Arguments:扩展参数,用于扩展AMQP协议自制定化使用

4.2、交换机类型

4.2.1、direct Exchange

  • 所有发送到direct exchange的消息被转发到RouteKey中指定的Queue。

注意:direct 模式可以使用rabbitmq自带的exchange:default exchange,所以不需要将exchange进行任何绑定操作,消息传递时,RouteKey必须完全匹配才会被队列接收,否则将会被消息抛弃。
image.png

4.2.1 代码
生产者:
public class send {
public static void main(String[] args) throws java.io.IOException {
/**
* 创建连接连接到MabbitMQ
*/
ConnectionFactory factory = new ConnectionFactory();
// 设置MabbitMQ所在主机ip或者主机名
factory.setHost(“192.168.60.129”);
factory.setPort(5672);
factory.setUsername(“guest”);
factory.setPassword(“guest”);
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个管道
Channel channel = connection.createChannel();
//声明
String exchangeName = “test_direct_exchange” ;
String routeKey = “test.direct”;
// 发送的消息
String message = “hello world!”;
// 往队列中发出一条消息
channel.basicPublish(exchangeName, routeKey, null, message.getBytes());
System.out.println(" [x] Sent ‘" + message + "’");
// 关闭频道和连接
channel.close();
connection.close();
}

}
消费者
public class Recv {
public static void main(String[] args) throws java.io.IOException, java.lang.InterruptedException {

     ConnectionFactory factory = new ConnectionFactory();  
     factory.setHost("192.168.60.129");  
     factory.setPort(5672);
     factory.setUsername("guest");
     factory.setPassword("guest");
        // 打开连接和创建频道,与发送端一样  
        Connection connection = factory.newConnection();  
        
        Channel channel = connection.createChannel();  
  
        // 声明
        String exchangeName = "test_direct_exchange"	;
        String exchangeType = "direct";
        String queueName = "test_direct_queue";
        String routeKey = "test.direct";
        //声明了一个交换机
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        //声明一个队列
        channel.queueDeclare(queueName, false, false, false, null);
        //建立一个绑定关系
        channel.queueBind(queueName, exchangeName, routeKey);
        // 是否持久化消息
        QueueingConsumer  consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、consumer
        channel.basicConsume(queueName, true, consumer);
        //循环获取消息
        while(true){
        	//获取消息,如果没有消息,这一步将会阻塞
        	Delivery delivery = consumer.nextDelivery();
        	String msg = new String(delivery.getBody());
        	System.out.println("收到消息:"+ msg);
        }
}

}

控制台看到绑定
image.png

4.2.2、Topic Exchange

  • 所有发送到Topic Exchange 的消息被转发到所有关心RouteKey中指定Topic的Queue上

  • Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic
    注意:可以使用通配符进行模糊匹配(* 或者 #)
    image.png
    4.2.2 代码
    生产者
    public class send {

    public static void main(String[] args) throws java.io.IOException {
    /**
    * 创建连接连接到MabbitMQ
    */
    ConnectionFactory factory = new ConnectionFactory();
    // 设置MabbitMQ所在主机ip或者主机名
    factory.setHost(“192.168.60.129”);
    factory.setPort(5672);
    factory.setUsername(“guest”);
    factory.setPassword(“guest”);
    // 创建一个连接
    Connection connection = factory.newConnection();
    // 创建一个管道
    Channel channel = connection.createChannel();

         //声明
         String exchangeName = "test_topic_exchange"	;
         String routeKey1 = "user.save";
         String routeKey2 = "user.update";
         String routeKey3 = "user.delete.abc";
         
         // 发送的消息  
         String message = "hello world!-----topic exchange ..";  
         // 往队列中发出一条消息  
         channel.basicPublish(exchangeName, routeKey1, null, message.getBytes()); 
         channel.basicPublish(exchangeName, routeKey2, null, message.getBytes()); 
         channel.basicPublish(exchangeName, routeKey3, null, message.getBytes()); 
         
         System.out.println(" [x] Sent '" + message + "'");  
         // 关闭频道和连接  
         channel.close();  
         connection.close();  
     }  
    

消费者
public class Recv {

public static void main(String[] args) throws java.io.IOException, java.lang.InterruptedException  {
    
    // TODO Auto-generated method stub
     ConnectionFactory factory = new ConnectionFactory();  
     factory.setHost("192.168.60.129");  
     factory.setPort(5672);
     factory.setUsername("guest");
     factory.setPassword("guest");
        // 打开连接和创建频道,与发送端一样  
        Connection connection = factory.newConnection();  
        
        Channel channel = connection.createChannel();  
  
        // 声明
        String exchangeName = "test_topic_exchange"	;
        String exchangeType = "topic";
        String queueName = "test_topic_queue";
        
        String routeKey = "user.#";
        //声明了一个交换机
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        //声明一个队列
        channel.queueDeclare(queueName, false, false, false, null);
        //建立一个绑定关系
        channel.queueBind(queueName, exchangeName, routeKey);
        // 是否持久化消息
        QueueingConsumer  consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、consumer
        channel.basicConsume(queueName, true, consumer);
        //循环获取消息
        while(true){
        	//获取消息,如果没有消息,这一步将会阻塞
        	Delivery delivery = consumer.nextDelivery();
        	String msg = new String(delivery.getBody());
        	System.out.println("收到消息:"+ msg);
        }
}

}

查看交换机绑定情况:
image.png
查看队列绑定情况:
image.png

4.2.3、Fanout Exchange

  • 不处理路由键,只需要简单将队列绑定到交换机上
  • 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
  • Fanout交换机转发消息最快
    image.png
    2.2.3 代码

生产者
public class send {

 public static void main(String[] args) throws java.io.IOException {  
       /** 
        * 创建连接连接到MabbitMQ 
        */  
       ConnectionFactory factory = new ConnectionFactory();  
       // 设置MabbitMQ所在主机ip或者主机名  
       factory.setHost("192.168.60.129");  
       factory.setPort(5672);
       factory.setUsername("guest");
       factory.setPassword("guest");
       // 创建一个连接  
       Connection connection = factory.newConnection();  
       // 创建一个管道
       Channel channel = connection.createChannel();  
       
       //声明
       String exchangeName = "test_fanout_exchange"	;
       
       // 发送的消息  
       for(int i=0;i<10;i++){
           String message = "hello world!-----fanout exchange ..";  
           // 往队列中发出一条消息  
           channel.basicPublish(exchangeName, "caad ", null, message.getBytes()); 
       }
       
       // 关闭频道和连接  
       channel.close();  
       connection.close();  
   } 

}

消费者
public class Recv {

public static void main(String[] args) throws java.io.IOException, java.lang.InterruptedException  {
    
    // TODO Auto-generated method stub
     ConnectionFactory factory = new ConnectionFactory();  
     factory.setHost("192.168.60.129");  
     factory.setPort(5672);
     factory.setUsername("guest");
     factory.setPassword("guest");
        // 打开连接和创建频道,与发送端一样  
        Connection connection = factory.newConnection();  
        
        Channel channel = connection.createChannel();  
  
        // 声明
        String exchangeName = "test_fanout_exchange"	;
        String exchangeType = "fanout";
        String queueName = "test_fanout_queue";
        String routeKey = " ";  //不设置路由键
        
        //声明了一个交换机
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        //声明一个队列
        channel.queueDeclare(queueName, false, false, false, null);
        //建立一个绑定关系
        channel.queueBind(queueName, exchangeName, routeKey);
        // 是否持久化消息
        QueueingConsumer  consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、consumer
        channel.basicConsume(queueName, true, consumer);
        //循环获取消息
        while(true){
        	//获取消息,如果没有消息,这一步将会阻塞
        	Delivery delivery = consumer.nextDelivery();
        	String msg = new String(delivery.getBody());
        	System.out.println("收到消息:"+ msg);
        }
}

}

查看相关绑定:

image.png

4.3、Binding-绑定

  • Exchange和Exchange、Queue之间的连接关系
  • Binding中可以包含RoutingKey或者参数
  • 对于消息发布者而言它只负责把消息发布出去,甚至它也不知道消息是发到哪个queue,消息通过exchange到达queue,exchange的职责非常简单,就是一边接收发布者的消息一边把这些消息推到queue中。而exchange是怎么知道消息应该推到哪个queue呢,这就要通过绑定queue与exchange时的routingkey了,通过代码进行绑定并且指定routingkey

4.4、Queue-消息队列

  • 消息队列,实际存储消息数据
  • Durability:是否持久化,Durable:是,Transient:否
  • Auto delete:如选yes,代表当最后一个监听被移除后,该Queue会自动被删除
  • 4.5、Message-消息
  • 服务器和应用之间传送的数据
  • 本质上就是一段数据,由properties和Payload(body)组成
  • 常用属性:delivery、mode、headers(自定义属性)
  • 其他属性:content_type、content_encoding、priority
    correlation_id、reply_to、expiration、message_id
    type、user_id、app_id、cluster_id 、timestamp

给消息添加一些属性:
生产者:
public class send {

 public static void main(String[] args) throws java.io.IOException {  
       /** 
        * 创建连接连接到MabbitMQ 
        */  
       ConnectionFactory factory = new ConnectionFactory();  
       // 设置MabbitMQ所在主机ip或者主机名  
       factory.setHost("192.168.60.129");  
       factory.setPort(5672);
       factory.setUsername("guest");
       factory.setPassword("guest");
       // 创建一个连接  
       Connection connection = factory.newConnection();  
       // 创建一个管道
       Channel channel = connection.createChannel();  

//给消息添加一些属性
Map<String,Object> headers = new HashMap<String,Object>();
headers.put(“my1”, “111”);
headers.put(“my2”, “222”);

       //给消息添加一些属性
      AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
            .deliveryMode(2)
            .contentEncoding("utf-8")
            .expiration("10000")  //消息10秒没有被消费将被移除
            .headers(headers)  //自定义属性
            .build();
       
       // 发送的消息  
       for(int i=0;i<5;i++){
           String message = "hello world!-----fanout exchange ..";  
           // 往队列中发出一条消息  
 channel.basicPublish("", "test001", properties, message.getBytes()); 
       }
       // 关闭频道和连接  
       channel.close();  
       connection.close();  
   }  

}

消费者:
public class Recv {

public static void main(String[] args) throws java.io.IOException, java.lang.InterruptedException  {
    
    // TODO Auto-generated method stub
     ConnectionFactory factory = new ConnectionFactory();  
     factory.setHost("192.168.60.129");  
     factory.setPort(5672);
     factory.setUsername("guest");
     factory.setPassword("guest");
        // 打开连接和创建频道,与发送端一样  
        Connection connection = factory.newConnection();  
        
        Channel channel = connection.createChannel();  
  
        // 声明
        String queueName = "test001";
        //声明一个队列
        channel.queueDeclare(queueName, false, false, false, null);

        // 是否持久化消息
        QueueingConsumer  consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、consumer
        channel.basicConsume(queueName, true, consumer);
        //循环获取消息
        while(true){
        	//获取消息,如果没有消息,这一步将会阻塞
        	Delivery delivery = consumer.nextDelivery();
        	String msg = new String(delivery.getBody());
        	System.out.println("收到消息:"+ msg);
        	//获取消息属性
        	 Map<String,Object> headers = delivery.getProperties().getHeaders();
        	 System.out.println("headers get my1 value: "  + headers.get("my1"));
        }
}

}

4.6、Virtual host -虚拟主机

  • 虚拟地址,用于进行逻辑隔离,最上层的消息路由
  • 一个虚拟主机可以有若干个Exchange和Queue
  • 同一个virtual host里面的Exchange或Queue的名称不能相同
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论