暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

RabbitMQ的动态创建交换机、队列、绑定,延迟队列代码实现,包含项目地址

Java技术学习笔记 2020-08-20
1644

---实践是检验真理的唯一标准---

yml参数配置

这次我使用的是RabbitTemplate

rabbitmq:
host: 192.168.225.136
port: 5672
username: thinking
password: 123
virtual-host: host1
publisher-returns: true
# 事务模式下这行需要删除
publisher-confirm-type: correlated
template:
# 找不到路由规则的消息 是否保留
mandatory: true

为什么Template不需要定义configuration文件来接收yml文件的参数?

这是个常识问题,我这里做个记录。。。

我都能忘记昨天吃了东西的,好在我喜欢做笔记。。。

springboot中何时加载Template,可以仔细看看自动装配注解:EnableAutoConfiguration
这类Template模板的初始化有个Properties文件,不如:
RabbitProperties
RedisProperties
方法中注解:ConfigurationProperties  指定了默认取得yml格式内容
至于具体的属性可以找set方法

我们的demo都是基于RabbitTemplate来写。。。


初始化数据

通过枚举ExchangeEnumQueueEnumBindingEnum动态维护和创建

1.初始化交换机

@Bean("createExchange")
public Object createExchange(RabbitAdmin rabbitAdmin) {
// 遍历交换机枚举
ExchangeEnum.toList().forEach(exchangeEnum -> {
// 根据交换机模式 生成不同的交换机
switch (exchangeEnum.getType()) {
case fanout:
rabbitAdmin.declareExchange(new FanoutExchange(exchangeEnum.getExchangeName(),
exchangeEnum.isDurable(), exchangeEnum.isAutoDelete()));
break;
case topic:
rabbitAdmin.declareExchange(new TopicExchange(exchangeEnum.getExchangeName(),
exchangeEnum.isDurable(), exchangeEnum.isAutoDelete()));
break;
case direct:
rabbitAdmin.declareExchange(new DirectExchange(exchangeEnum.getExchangeName(),
exchangeEnum.isDurable(), exchangeEnum.isAutoDelete()));
break;
}
});
return null;
}

2.初始化队列

@Bean("createQueue")
public Object createQueue(RabbitAdmin rabbitAdmin) {
// 遍历队列枚举 将队列注册到spring bean工厂 让spring实现队列的管理
QueueEnum.toList().forEach(queueEnum -> {
rabbitAdmin.declareQueue(new Queue(queueEnum.getName(),
queueEnum.isDurable(), queueEnum.isExclusive(), queueEnum.isAutoDelete(), queueEnum.getArguments()));
});
return null;
}

3.交换机和队列绑定

@Bean("createBinding")
public Object createBinding(RabbitAdmin rabbitAdmin) {
// 遍历队列枚举 将队列绑定到指定交换机
BindingEnum.toList().forEach(bindingEnum -> {
// 交换机
ExchangeEnum exchangeEnum = bindingEnum.getExchangeEnum();
// queue
QueueEnum queueEnum = bindingEnum.getQueueEnum();
// 绑定
rabbitAdmin.declareBinding(new Binding(
// queue名称
queueEnum.getName(),
Binding.DestinationType.QUEUE,
// exchange名称
exchangeEnum.getExchangeName(),
// queue的routingKey
queueEnum.getRoutingKey(),
// 绑定的参数
bindingEnum.getArguments()));
});
return null;
}

延迟队列

1.定义队列

/**
* 超时队列---不需要定义RabbitListener方法
*/
deal_queue("deal_queue""deal.queue"truefalsefalse, dealParams()),
/**
* 超时接收队列
*/
reply_queue("reply_queue", "reply.queue", true, false, false, null),
public static Map<String, Object> dealParams(){
// reply_to 队列
Map<String,Object> map = new HashMap<>();
//设置消息的过期时间 单位毫秒
map.put("x-message-ttl",10000);
//设置附带的死信交换机
map.put("x-dead-letter-exchange","reply_exchange");
//指定重定向的路由建 消息作废之后可以决定需不需要更改他的路由建 如果需要 就在这里指定
map.put("x-dead-letter-routing-key","reply.queue");
return map;
}

2.定义交换机

/**
* 超时交换机
*/
deal_exchange("deal_exchange", ExchangeTypeEnum.topic, truefalse),
/**
* 超时接收交换机
*/
reply_exchange("reply_exchange", ExchangeTypeEnum.topic, true, false),

3.交换机和队列绑定

deal_binding(ExchangeEnum.deal_exchangeQueueEnum.deal_queuenull),
reply_binding(ExchangeEnum.reply_exchange, QueueEnum.reply_queue, null)

4.不定义超时队列的@RabbitListener,只定义超时接收队列的@RabbitListener

@RabbitListener(queues = {"reply_queue"})
@RabbitHandler
public void reply_queue(Message message, Channel channel) throws Exception {
System.err.println("消费端-reply: " + new String(message.getBody(), "UTF-8"));
Long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false);
}

测试:

/**
* 延迟队列测试
*/
public void deal_queue_test() {
ExchangeEnum exchangeEnum = BindingEnum.deal_binding.getExchangeEnum();
QueueEnum queueEnum = BindingEnum.deal_binding.getQueueEnum();
// 消息
String message = "11111111111111111111111111111111111111";
MessageProperties messageProperties = getMessageProperties();
// 发送
rabbitTemplate.convertSendAndReceive(
exchangeEnum.getExchangeName(),
queueEnum.getRoutingKey(),
new Message(message.getBytes(), messageProperties));
}

异步队列

1.AsyncRabbitTemplate定义

/**
* 异步队列
* @param rabbitTemplate
* @return
*/
@Bean
public AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate rabbitTemplate){
AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate);
asyncRabbitTemplate.setReceiveTimeout(50000);
return asyncRabbitTemplate;
}

2.测试

public void async() {
System.err.println("---------------async--------------start---------");
AsyncRabbitTemplate.RabbitConverterFuture<Object> future = asyncRabbitTemplate.convertSendAndReceive("reply_exchange", "reply.queue", "123123123");
// 配置下面代码时 如果 队列监听中没有返回值时会报错
future.addCallback(new ListenableFutureCallback<Object>() {
@Override
public void onFailure(Throwable ex) {
ex.printStackTrace();
}
@Override
public void onSuccess(Object result) {
System.out.println("回调收到结果=> " + result);
}
});
System.err.println("---------------async--------------end---------");
}

3.监听方法

@RabbitListener(queues = {"async_queue"})
@RabbitHandler
public Object async_queue(Message message, Channel channel) throws Exception {
System.err.println("消费端-async: " + new String(message.getBody(), "UTF-8"));
return "ok";
}

Java api

1.消息回退:

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

long deliveryTag:消息唯一标识,这是RabbitMQ自动生成的,不需要人为管理,只需要从message.getMessageProperties().getDeliveryTag() 就可以获得。
boolean multiple:是否批量退回,不开启就使用false,开启批量退回需要增加自己的业务判断逻辑(比如:攒够几条再批量回退,或者设置等待间隔等等)
boolean requeue:是否退回到消息队列,退回就使用true,就是交给其他消费者处理。

2.拒绝消息

void basicReject(long deliveryTag, boolean requeue) throws IOException;

deliveryTag:发布的每一条消息都会获得一个唯一的deliveryTag,它在channel范围内是唯一的
requeue:表示如何处理这条消息,为true表示重新放入RabbitMQ的发送队列中,为false表示通知RabbitMQ销毁该消息

3.确认ack

void basicAck(long deliveryTag, boolean multiple) throws IOException;

deliveryTag:该消息的index
multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。

4.创建一个队列

Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;


durable:truefalse true:在服务器重启时,能够存活
exclusive :是否为当前连接的专用队列,在连接断开后,会自动删除该队列,生产环境中应该很少用到吧。
autodelete:当没有任何消费者使用时,自动删除该队列

5.启动一个消费者,并返回服务端生成的消费者标识

/**
* queue:队列名
* autoAck:true 接收到传递过来的消息后acknowledged(应答服务器),false 接收到消息后不应答服务器
* consumerTag:客户端生成的一个消费者标识
* nolocal:如果服务器不应将在此通道连接上发布的消息传递给此使用者,则为true;请注意RabbitMQ服务器上不支持此标记
* exclusive: 如果是单个消费者,则为true
* arguments:消费的一组参数
* deliverCallback: 当一个消息发送过来后的回调接口
* cancelCallback:当一个消费者取消订阅时的回调接口;取消消费者订阅队列时除了使用{@link Channel#basicCancel}之外的所有方式都会调用该回调方法
* shutdownSignalCallback: 当channel/connection 关闭后回调
*/
channel.basicConsume(QUEUE_NAME, true, ctag, falsefalse, arguments, deliverCallback, consumerTag -> {}, (consumerTag, sig) -> {});

6.取消消费者订阅

/**
* 取消消费者对队列的订阅关系
* consumerTag:服务器端生成的消费者标识
**/
void basicCancel(String consumerTag)

7.主动拉取队列中的一条消息

/**
* 从消息队列中取出第一条消息;整个方法的执行过程是首先消费队列,然后检索第一条消息,然后再取消订阅
*/
GetResponse response = channel.basicGet(QUEUE_NAME, true);
System.out.println("消费者接收到的消息是:"+new String(response.getBody(), "UTF-8"));

参数介绍

1.队列参数

x-dead-letter-exchange 死信交换机
x-dead-letter-routing-key 死信消息重定向路由键
x-expires 队列在指定毫秒数后被删除
x-ha-policy 创建HA队列
x-ha-nodes HA队列的分布节点
x-max-length 队列的最大消息数
x-message-ttl 毫秒为单位的消息过期时间,队列级别
x-max-priority 最大优先值为255的队列优先排序功能

2.消息参数

content-type 消息体的MIME类型,如application/json
content-encoding 消息的编码类型
message-id 消息的唯一性标识,由应用进行设置
correlation-id 一般用做关联消息的message-id,常用于消息的响应
timestamp 消息的创建时刻,整形,精确到秒
文章转载自Java技术学习笔记,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论