暂无图片
暂无图片
1
暂无图片
暂无图片
暂无图片

RabbitMQ 使用之 amqp-client 方式

iWenhao 2022-02-15
317

使用RabbitMQ 可以是通过

  1. rabbit 提供的amqp-client

  2. spring 提供的 spring-rabbit

  3. springboot 提供的 spring-boot-start-amqp



Rabbit MQ 7中用法+ 3种高级用法


Rabbit MQ 7中用法:

  1. 简单模式(Hello world)

  2. 工作队列模式(Work Queue)

  3. 广播(Publish-Subcribe)

  4. 路由模式(Routing)

  5. Topic 模式

  6. RPC模式

  7. publish Confirm模式


简单模式(Hello world):

生产者将消息发送给默认的交换机(exchange = “”),代码中发送消息的时候,无需指定 exchange 只需要指定 queue. 消费者监听指定的消费者。


工作队列模式(Work Queue)

生产者将消息发送给默认的交换机(exchange = “”,代码中发送消息的时候,无需指定 exchange 只需要指定 queue. 消费者监听指定的消费者。

区别于简单模式(hello world)只是消费者的数量的区别。

在消费者指定 prefetch = 1 的条件下,且消费了消息,返回了 ACK 的情况下,消息只会发给一个消费者,如果被消费就不会发给另外的消费者,出发接受消息的消费者拒收消息。


广播(Publish-Subcribe)

生产者将消息发送给指定的交换机(fanout type),不需要指定 queue. 消费者监听指定的消费者。


路由模式(Routing)

生产者将消息发送给指定的交换机(direct type),不需要指定 queue, 消息会根据 Routing Key将消息路由给对应的消息队列(Queue). 消费者监听指定的消费者。


Topic 模式

生产者将消息发送给指定的交换机(topic type),不需要指定 queue, 消息会根据 Routing Key将消息通过 通配符的方式路由给对应的消息队列(Queue). 消费者监听指定的消费者。

注意:
可以代表一个单词
#  可以代表多个单词


aaa.*  匹配   aaa.bbb 
aaa.#  匹配   aaa.bbb  和 aaa.bbb.ccc


RPC模式

RPC 方式不推荐,如下是官方说明

       我们的代码仍然非常简单,并没有尝试解决更复杂(但重要)的问题,例如:
如果没有服务器在运行,客户端应该如何反应?
客户端是否应该为 RPC 设置某种超时?
如果服务器发生故障并引发异常,是否应该将其转发给客户端?
在处理之前防止无效的传入消息(例如检查边界)


Publish Confirm模式

生产者发送消息,可以监听回调,回调有两种 Confirm 和 Return 

消息发送给交换机成功/失败会有 confirm 回调, 如果消息发往队列成功失败会有 return 回调。



3种高级用法:

  1. TTL 队列

  2. 延时队列

  3. 死信队列


TTL队列消息:

TTL队列消息有时效,超过有效时间,就会销毁。


延时队列消息:

延时队列消息,生产者将消息成功发往Queue后一定时间后,才会被消费者监听到。


死信队列消息:

死信队列其实也是一只队列,只是用处是用来装那些不能消费的消息,或者特殊情况下的消息。


消息会在如下3中情况进入死信队列

  1. 消息被拒收(basicNack)/ basicReject 并且不将消息方会原队列(requeue=false).

  2. 队列中消息数量超过队列上限。

  3. 原消息队列设置了过期时间,消息过期未被消费,进入死信队列(正常情况)。



下面是代码演示


Connection

public class RabbitMqUtils {
private static ConnectionFactory connectionFactory = new ConnectionFactory();
static {
        connectionFactory.setHost("124.111.11.11");
connectionFactory.setPort(5672);//5672是RabbitMQ的默认端口号
connectionFactory.setUsername("conan");
connectionFactory.setPassword("conan");
connectionFactory.setVirtualHost("/conan");
}
public static Connection getConnection(){
Connection conn = null;
try {
conn = connectionFactory.newConnection();
return conn;
} catch (Exception e) {
throw new RuntimeException(e);
}
    }
}


Hello world 模式 生产者

Connection connection = RabbitMqUtils.getConnection();


Channel channel = connection.createChannel();


//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
//第一个参数:队列名称ID
//第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
//第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
//第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
//其他额外的参数, null
channel.queueDeclare("hello_world_test", false, false, false, null);


String msg = "hello world rabbit mq";


//四个参数
        //exchange 交换机
//队列名称 ( hello world 模式下 虽然是显示是路由key 其实是 队列名称)
//额外的设置属性
//最后一个参数是要传递的消息字节数组
channel.basicPublish("", "hello_world_test", null, msg.getBytes());


channel.close();
connection.close();


Hello world 模式 消费者


      {  Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();
//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
//第一个参数:队列名称ID
//第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
//第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
//第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
//其他额外的参数, null
        channel.queueDeclare("hello_world_test"falsefalsefalsenull);
channel.basicConsume("hello_world_test",new MyDefaultConsumer(channel)); }


class MyDefaultConsumer extends DefaultConsumer{


private Channel channel;


/**
     * Constructs a new instance and records its association to the passed-in channel.
* @param channel the channel to which this consumer is attached
*/
public MyDefaultConsumer(Channel channel) {
super(channel);
this.channel =channel;
}


@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("消费者接收到的消息:"+message);
System.out.println("消息的TagId:"+envelope.getDeliveryTag());
//false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
channel.basicAck(envelope.getDeliveryTag(), false);
super.handleDelivery(consumerTag, envelope, properties, body);
}
}
       



工作队列模式 生产者

 public static void main(String[] args) throws IOException, TimeoutException {
/**
* 工作队列模式 work_queue 其实和 hello world 相似 都没有交换机 exchange(默认有个虚拟的交换机)
*
* 区别:hello world 只有一个消费者(虚拟交换机绑定一个队列), work queue 有多个消费者(虚拟交换机绑定多个队列)
*/
Connection connection = RabbitMqUtils.getConnection();


Channel channel = connection.createChannel();


channel.queueDeclare("work_queue_test", false, false, false, null);


for (int i = 1; i <= 100; i++) {
String msg = "工作队列模式 消息---》 " + i;
channel.basicPublish("", "work_queue_test", null, msg.getBytes());
}


channel.close();
        connection.close();
}


工作队列模式 消费者


public class ConsumerA {


public static void main(String[] args) throws IOException {




Connection connection = RabbitMqUtils.getConnection();


Channel channel = connection.createChannel();


// 每次只去一个处理
channel.basicQos(1);


channel.queueDeclare("work_queue_test",false,false,false,null);


channel.basicConsume("work_queue_test", new PublishDefaultConsumer(channel));


}
}


Topic 模式生产者


public class Producer {


public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {


Connection connection = RabbitMqUtils.getConnection();


Channel channel = connection.createChannel();


/**
* 开启监听模式
*
* confirm 和 return 都是 生产者 和 broker 之间的关系, 和消费者无关.
*/
channel.confirmSelect();


// 监听消息是否到达 exchange
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.printf("消息到达了 exchange " + deliveryTag);
}


@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.printf("消息不能到达了 exchange " + deliveryTag);
}
});


// 监听消息是否到达 队列


/**
* 在 topic 的模式下 如果消息 路由key 找不到指定的 队列, 就会存在被 return的情况
*
* mandatory 必须设置成 true
*/
channel.addReturnListener(new ReturnCallback() {
@Override
public void handle(Return returnMessage) {
System.out.printf("消息被退回 return ");
}
});




String msg = "确认消息 aaa.aaa";
String msg2 = "到不了的消息 aaa.bbb.aaa";


/**
* mandatory true代表如果消息无法正常投递则return回生产者,
* 如果false,则直接将消息放弃。
*/
channel.basicPublish("exchange_topic", "aaa.aaa", true, null, msg.getBytes());
channel.basicPublish("exchange_topic", "aaa.bbb.aaa", true, null, msg2.getBytes());




// Thread.sleep(5000);
// channel.close();
// connection.close();




}
}


Topic 模式消费者




public class ConsumerA {


public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();


Channel channel = connection.createChannel();


// 消费者指点自己的 队列
channel.queueDeclare("queue_exchange_topic", false, false, false, null);


// 队列绑定 交换机 exchange
channel.queueBind("queue_exchange_topic", "exchange_topic", "aaa.*");


// 每次取一个处理
        channel.basicQos(1);


channel.basicConsume("queue_exchange_topic", new PublishDefaultConsumer(channel));
}
}

 

交换机的 fanout, direct, topic 模式差不多还是自己试试看吧。

文章转载自iWenhao,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论