---实践是检验真理的唯一标准---

yml参数配置
这次我使用的是RabbitTemplate
rabbitmq:host: 192.168.225.136port: 5672username: thinkingpassword: 123virtual-host: host1publisher-returns: true# 事务模式下这行需要删除publisher-confirm-type: correlatedtemplate:# 找不到路由规则的消息 是否保留mandatory: true
为什么Template不需要定义configuration文件来接收yml文件的参数?
这是个常识问题,我这里做个记录。。。
我都能忘记昨天吃了东西的,好在我喜欢做笔记。。。
springboot中何时加载Template,可以仔细看看自动装配注解:EnableAutoConfiguration这类Template模板的初始化有个Properties文件,不如:RabbitPropertiesRedisProperties方法中注解:ConfigurationProperties 指定了默认取得yml格式内容至于具体的属性可以找set方法
我们的demo都是基于RabbitTemplate来写。。。
初始化数据
通过枚举ExchangeEnum、QueueEnum、BindingEnum动态维护和创建
1.初始化交换机
@Bean("createExchange")public Object createExchange(RabbitAdmin rabbitAdmin) {// 遍历交换机枚举ExchangeEnum.toList().forEach(exchangeEnum -> {// 根据交换机模式 生成不同的交换机switch (exchangeEnum.getType()) {case fanout:rabbitAdmin.declareExchange(new FanoutExchange(exchangeEnum.getExchangeName(),exchangeEnum.isDurable(), exchangeEnum.isAutoDelete()));break;case topic:rabbitAdmin.declareExchange(new TopicExchange(exchangeEnum.getExchangeName(),exchangeEnum.isDurable(), exchangeEnum.isAutoDelete()));break;case direct:rabbitAdmin.declareExchange(new DirectExchange(exchangeEnum.getExchangeName(),exchangeEnum.isDurable(), exchangeEnum.isAutoDelete()));break;}});return null;}
2.初始化队列
@Bean("createQueue")public Object createQueue(RabbitAdmin rabbitAdmin) {// 遍历队列枚举 将队列注册到spring bean工厂 让spring实现队列的管理QueueEnum.toList().forEach(queueEnum -> {rabbitAdmin.declareQueue(new Queue(queueEnum.getName(),queueEnum.isDurable(), queueEnum.isExclusive(), queueEnum.isAutoDelete(), queueEnum.getArguments()));});return null;}
3.交换机和队列绑定
@Bean("createBinding")public Object createBinding(RabbitAdmin rabbitAdmin) {// 遍历队列枚举 将队列绑定到指定交换机BindingEnum.toList().forEach(bindingEnum -> {// 交换机ExchangeEnum exchangeEnum = bindingEnum.getExchangeEnum();// queueQueueEnum queueEnum = bindingEnum.getQueueEnum();// 绑定rabbitAdmin.declareBinding(new Binding(// queue名称queueEnum.getName(),Binding.DestinationType.QUEUE,// exchange名称exchangeEnum.getExchangeName(),// queue的routingKeyqueueEnum.getRoutingKey(),// 绑定的参数bindingEnum.getArguments()));});return null;}
延迟队列
1.定义队列
/*** 超时队列---不需要定义RabbitListener方法*/deal_queue("deal_queue", "deal.queue", true, false, false, dealParams()),/*** 超时接收队列*/reply_queue("reply_queue", "reply.queue", true, false, false, null),
public static Map<String, Object> dealParams(){// reply_to 队列Map<String,Object> map = new HashMap<>();//设置消息的过期时间 单位毫秒map.put("x-message-ttl",10000);//设置附带的死信交换机map.put("x-dead-letter-exchange","reply_exchange");//指定重定向的路由建 消息作废之后可以决定需不需要更改他的路由建 如果需要 就在这里指定map.put("x-dead-letter-routing-key","reply.queue");return map;}
2.定义交换机
/*** 超时交换机*/deal_exchange("deal_exchange", ExchangeTypeEnum.topic, true, false),/*** 超时接收交换机*/reply_exchange("reply_exchange", ExchangeTypeEnum.topic, true, false),
3.交换机和队列绑定
deal_binding(ExchangeEnum.deal_exchange, QueueEnum.deal_queue, null),reply_binding(ExchangeEnum.reply_exchange, QueueEnum.reply_queue, null)
4.不定义超时队列的@RabbitListener,只定义超时接收队列的@RabbitListener
@RabbitListener(queues = {"reply_queue"})@RabbitHandlerpublic void reply_queue(Message message, Channel channel) throws Exception {System.err.println("消费端-reply: " + new String(message.getBody(), "UTF-8"));Long deliveryTag = message.getMessageProperties().getDeliveryTag();channel.basicAck(deliveryTag, false);}
测试:
/*** 延迟队列测试*/public void deal_queue_test() {ExchangeEnum exchangeEnum = BindingEnum.deal_binding.getExchangeEnum();QueueEnum queueEnum = BindingEnum.deal_binding.getQueueEnum();// 消息String message = "11111111111111111111111111111111111111";MessageProperties messageProperties = getMessageProperties();// 发送rabbitTemplate.convertSendAndReceive(exchangeEnum.getExchangeName(),queueEnum.getRoutingKey(),new Message(message.getBytes(), messageProperties));}
异步队列
1.AsyncRabbitTemplate定义
/*** 异步队列* @param rabbitTemplate* @return*/@Beanpublic AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate rabbitTemplate){AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate);asyncRabbitTemplate.setReceiveTimeout(50000);return asyncRabbitTemplate;}
2.测试
public void async() {System.err.println("---------------async--------------start---------");AsyncRabbitTemplate.RabbitConverterFuture<Object> future = asyncRabbitTemplate.convertSendAndReceive("reply_exchange", "reply.queue", "123123123");// 配置下面代码时 如果 队列监听中没有返回值时会报错future.addCallback(new ListenableFutureCallback<Object>() {@Overridepublic void onFailure(Throwable ex) {ex.printStackTrace();}@Overridepublic void onSuccess(Object result) {System.out.println("回调收到结果=> " + result);}});System.err.println("---------------async--------------end---------");}
3.监听方法
@RabbitListener(queues = {"async_queue"})@RabbitHandlerpublic Object async_queue(Message message, Channel channel) throws Exception {System.err.println("消费端-async: " + new String(message.getBody(), "UTF-8"));return "ok";}
Java api
1.消息回退:
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
long deliveryTag:消息唯一标识,这是RabbitMQ自动生成的,不需要人为管理,只需要从message.getMessageProperties().getDeliveryTag() 就可以获得。boolean multiple:是否批量退回,不开启就使用false,开启批量退回需要增加自己的业务判断逻辑(比如:攒够几条再批量回退,或者设置等待间隔等等)boolean requeue:是否退回到消息队列,退回就使用true,就是交给其他消费者处理。
2.拒绝消息
void basicReject(long deliveryTag, boolean requeue) throws IOException;
deliveryTag:发布的每一条消息都会获得一个唯一的deliveryTag,它在channel范围内是唯一的requeue:表示如何处理这条消息,为true表示重新放入RabbitMQ的发送队列中,为false表示通知RabbitMQ销毁该消息
3.确认ack
void basicAck(long deliveryTag, boolean multiple) throws IOException;
deliveryTag:该消息的indexmultiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
4.创建一个队列
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;durable:true、false true:在服务器重启时,能够存活exclusive :是否为当前连接的专用队列,在连接断开后,会自动删除该队列,生产环境中应该很少用到吧。autodelete:当没有任何消费者使用时,自动删除该队列
5.启动一个消费者,并返回服务端生成的消费者标识
/*** queue:队列名* autoAck:true 接收到传递过来的消息后acknowledged(应答服务器),false 接收到消息后不应答服务器* consumerTag:客户端生成的一个消费者标识* nolocal:如果服务器不应将在此通道连接上发布的消息传递给此使用者,则为true;请注意RabbitMQ服务器上不支持此标记* exclusive: 如果是单个消费者,则为true* arguments:消费的一组参数* deliverCallback: 当一个消息发送过来后的回调接口* cancelCallback:当一个消费者取消订阅时的回调接口;取消消费者订阅队列时除了使用{@link Channel#basicCancel}之外的所有方式都会调用该回调方法* shutdownSignalCallback: 当channel/connection 关闭后回调*/channel.basicConsume(QUEUE_NAME, true, ctag, false, false, arguments, deliverCallback, consumerTag -> {}, (consumerTag, sig) -> {});
6.取消消费者订阅
/*** 取消消费者对队列的订阅关系* consumerTag:服务器端生成的消费者标识**/void basicCancel(String consumerTag)
7.主动拉取队列中的一条消息
/*** 从消息队列中取出第一条消息;整个方法的执行过程是首先消费队列,然后检索第一条消息,然后再取消订阅*/GetResponse response = channel.basicGet(QUEUE_NAME, true);System.out.println("消费者接收到的消息是:"+new String(response.getBody(), "UTF-8"));
参数介绍
1.队列参数
x-dead-letter-exchange 死信交换机x-dead-letter-routing-key 死信消息重定向路由键x-expires 队列在指定毫秒数后被删除x-ha-policy 创建HA队列x-ha-nodes HA队列的分布节点x-max-length 队列的最大消息数x-message-ttl 毫秒为单位的消息过期时间,队列级别x-max-priority 最大优先值为255的队列优先排序功能
2.消息参数
content-type 消息体的MIME类型,如application/jsoncontent-encoding 消息的编码类型message-id 消息的唯一性标识,由应用进行设置correlation-id 一般用做关联消息的message-id,常用于消息的响应timestamp 消息的创建时刻,整形,精确到秒
文章转载自Java技术学习笔记,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




