暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

RabbitMQ笔记(四)-CachingConnectionFactory

生活不止眼前的代码 2019-01-15
417

通常我们使用RabbitTemplate来进行简单的收发消息,而RabbitTemplate使用CachingConnectionFactory作为连接工厂

CachingConnectionFactory

配置bean

  1. @Bean

  2. public CachingConnectionFactory cachingConnectionFactory(){

  3.    CachingConnectionFactory factory = new CachingConnectionFactory();


  4.    factory.setAddresses(rabbitProperties.getAddresses());

  5.    factory.setUsername(rabbitProperties.getUsername());

  6.    factory.setPassword(rabbitProperties.getPassword());

  7.    factory.setVirtualHost(rabbitProperties.getVirtualHost());

  8.    factory.setPublisherConfirms(rabbitProperties.isPublisherConfirms());

  9.    factory.setPublisherReturns(rabbitProperties.isPublisherReturns());


  10.    factory.addChannelListener(rabbitChannelListener);

  11.    factory.addConnectionListener(rabbitConnectionListener);

  12.    factory.setRecoveryListener(rabbitRecoveryListener);


  13.    return factory;

  14. }

通常的教程这样配置,但是特别注意到官网有一段这样的提示

在一个应用里面同时存在消费者和生产者时,需要特别注意


文档建议使用一个具有相同选项的单独CachingConnectionFactory实例—一个用于生产者,一个用于消费者。

这是为了避免消费者由于生产者阻塞而阻塞

这里可以做一个测试,首先将RabbitMQ的内存水位调低,产生内存报警


再发送生产者的消息时,会发现产生了阻塞,同时添加一个监听者,这条指令同样也会发送阻塞

于是配置两个CachingConnectionFactory

  1. private CachingConnectionFactory getCachingConnectionFactory() {

  2.        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();


  3.        cachingConnectionFactory.setAddresses(rabbitProperties.getAddresses());

  4.        cachingConnectionFactory.setUsername(rabbitProperties.getUsername());

  5.        cachingConnectionFactory.setPassword(rabbitProperties.getPassword());

  6.        cachingConnectionFactory.setVirtualHost(rabbitProperties.getVirtualHost());

  7.        cachingConnectionFactory.setPublisherConfirms(rabbitProperties.isPublisherConfirms());

  8.        cachingConnectionFactory.setPublisherReturns(rabbitProperties.isPublisherReturns());



  9.        cachingConnectionFactory.addChannelListener(rabbitChannelListener);

  10.        cachingConnectionFactory.addConnectionListener(rabbitConnectionListener);

  11.        cachingConnectionFactory.setRecoveryListener(rabbitRecoveryListener);



  12.        return cachingConnectionFactory;

  13.    }


  14.    @Bean("test-consumer-connection-factory")

  15.    public CachingConnectionFactory consumerCachingConnectionFactory() {


  16.        return getCachingConnectionFactory();


  17.    }


  18.    @Bean

  19.    @Primary

  20.    public CachingConnectionFactory cachingConnectionFactory() {


  21.        return getCachingConnectionFactory();

  22.    }

将@Bean("test-consumer-connection-factory") 用于消费者 则在发送阻塞之后,消费者的通道仍然是畅通的

当然由于使用RabbitTemplate,也可以在RabbitTemplate配置

  1. rabbitTemplate.setUsePublisherConnection(true);

这里有三个监听器

ChannelListener 用于监听通道的创建和销毁

  1. @Service

  2. public class RabbitChannelListener implements ChannelListener {

  3.    @Override

  4.    public void onCreate(Channel channel, boolean b) {

  5.        log.info("======================onCreate channel: {}, transactional: {}", channel, b);

  6.    }


  7.    @Override

  8.    public void onShutDown(ShutdownSignalException signal){

  9.    // 可根据isHardError判断是channel断开还是connection断开

  10.        if(signal.isHardError()){

  11.            AMQImpl.Connection.Close close = (AMQImpl.Connection.Close) signal.getReason();

  12.            log.warn("=====================Connection onShutDown replyCode: {}, methodId: {}, classId: {}, replyText: {}",

  13.                    close.getReplyCode(), close.getMethodId(), close.getClassId(), close.getReplyText());

  14.        }else {

  15.            AMQImpl.Channel.Close close = (AMQImpl.Channel.Close) signal.getReason();

  16.            log.warn("=====================Channel onShutDown replyCode: {}, methodId: {}, classId: {}, replyText: {}",

  17.                    close.getReplyCode(), close.getMethodId(), close.getClassId(), close.getReplyText());

  18.        }

  19.    }

  20. }

ConnectionListener 用于监听连接的创建和关闭

  1. public class RabbitConnectionListener implements ConnectionListener {

  2.    @Override

  3.    public void onCreate(Connection connection) {

  4.        log.info("================onCreate: {}", connection);

  5.    }


  6.    @Override

  7.    public void onClose(Connection connection) {

  8.        log.info("================onClose: {}", connection);

  9.    }


  10.    @Override

  11.    public void onShutDown(ShutdownSignalException signal) {

  12.        log.info("================onShutDown: {}", signal);

  13.    }

  14. }

RecoveryListener 监听自动重连的情况,这个listener没有测试出在什么场景会出现

  1. public class RabbitRecoveryListener implements RecoveryListener {

  2.    @Override

  3.    public void handleRecovery(Recoverable recoverable) {

  4.        log.info("================handleRecovery: {}", recoverable);

  5.    }


  6.    @Override

  7.    public void handleRecoveryStarted(Recoverable recoverable) {

  8.        log.info("================handleRecoveryStarted: {}", recoverable);

  9.    }

  10. }


文章转载自生活不止眼前的代码,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论