暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

SpringBoot2.0源码分析(二):SpringBoot整合ActiveMQ

贰级天災 2018-09-17
281

上一片对SpringBoot做了一个简单的介绍,从本篇开始将通过整合其它组件分析SpringBoot源码。

如何使用ActiveMQ

1、引用spring-boot-starter-activemq

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

2、添加配置

spring.activemq.in-memory=true
spring.activemq.pool.enabled=false

3、SampleActiveMQApplication.java

@SpringBootApplication
@EnableJms
public class SampleActiveMQApplication {
    // 贰级天災
    @Bean
    public Queue queue() {
        return new ActiveMQQueue("sample.queue");
    }

    public static void main(String[] args) {
        SpringApplication.run(SampleActiveMQApplication.class, args);
    }

}

4、Consumer.java

@Component
public class Consumer {
    // 贰级天災
    @JmsListener(destination = "sample.queue")
    public void receiveQueue(String text) {
        System.out.println(text);
    }

}

5、Producer.java

@Component
public class Producer implements CommandLineRunner {
    // 贰级天災
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Autowired
    private Queue queue;

    @Override
    public void run(String... args) throws Exception {
        send("Sample message");
        System.out.println("Message was sent to the Queue");
    }

    public void send(String msg) {
        this.jmsMessagingTemplate.convertAndSend(this.queue, msg);
    }

}

实现CommandLineRunner接口主要是让项目启动后执行run方法。

以上几个类创建完成后就可以使用ActiveMQ了。

可以看到,SampleActiveMQApplication中注入了一个队列,Consumer中通过@JmsListener注解使得receiveQueue方法监听该队列。Producer中使用jmsMessagingTemplate给该队列发送消息。

为什么这些代码可以整合ActiveMQ?为什么JmsMessagingTemplate可以直接注入使用?
这里我们主要看一个类:JmsAutoConfiguration

@Configuration
@ConditionalOnClass({ Message.class, JmsTemplate.class })
@ConditionalOnBean(ConnectionFactory.class)
@EnableConfigurationProperties(JmsProperties.class)
@Import(JmsAnnotationDrivenConfiguration.class)
public class JmsAutoConfiguration {
    // 贰级天災
    @Configuration
    protected static class JmsTemplateConfiguration {

        private final JmsProperties properties;

        private final ObjectProvider<DestinationResolver> destinationResolver;

        private final ObjectProvider<MessageConverter> messageConverter;

        public JmsTemplateConfiguration(JmsProperties properties,
                ObjectProvider<DestinationResolver> destinationResolver,
                ObjectProvider<MessageConverter> messageConverter) {
            this.properties = properties;
            this.destinationResolver = destinationResolver;
            this.messageConverter = messageConverter;
        }

        @Bean
        @ConditionalOnMissingBean
        @ConditionalOnSingleCandidate(ConnectionFactory.class)
        public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
            PropertyMapper map = PropertyMapper.get();
            JmsTemplate template = new JmsTemplate(connectionFactory);
            template.setPubSubDomain(this.properties.isPubSubDomain());
            map.from(this.destinationResolver::getIfUnique).whenNonNull()
                    .to(template::setDestinationResolver);
            map.from(this.messageConverter::getIfUnique).whenNonNull()
                    .to(template::setMessageConverter);
            mapTemplateProperties(this.properties.getTemplate(), template);
            return template;
        }

        private void mapTemplateProperties(Template properties, JmsTemplate template) {
            PropertyMapper map = PropertyMapper.get();
            map.from(properties::getDefaultDestination).whenNonNull()
                    .to(template::setDefaultDestinationName);
            map.from(properties::getDeliveryDelay).whenNonNull().as(Duration::toMillis)
                    .to(template::setDeliveryDelay);
            map.from(properties::determineQosEnabled).to(template::setExplicitQosEnabled);
            map.from(properties::getDeliveryMode).whenNonNull().as(DeliveryMode::getValue)
                    .to(template::setDeliveryMode);
            map.from(properties::getPriority).whenNonNull().to(template::setPriority);
            map.from(properties::getTimeToLive).whenNonNull().as(Duration::toMillis)
                    .to(template::setTimeToLive);
            map.from(properties::getReceiveTimeout).whenNonNull().as(Duration::toMillis)
                    .to(template::setReceiveTimeout);
        }

    }

    @Configuration
    @ConditionalOnClass(JmsMessagingTemplate.class)
    @Import(JmsTemplateConfiguration.class)
    protected static class MessagingTemplateConfiguration {

        @Bean
        @ConditionalOnMissingBean
        @ConditionalOnSingleCandidate(JmsTemplate.class)
        public JmsMessagingTemplate jmsMessagingTemplate(JmsTemplate jmsTemplate) {
            return new JmsMessagingTemplate(jmsTemplate);
        }

    }

}

这个类帮忙注入了很多ActiveMQ要使用到的对象,其中就包括jmsMessagingTemplate。

再看看@JmsListener(destination = "sample.queue")为什么可以指定方法为消息接收方。看一下JmsListenerAnnotationBeanPostProcessor的postProcessAfterInitialization方法:

    public Object postProcessAfterInitialization(final Object bean, String beanName) throws BeansException {
        if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
            Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
            Map<Method, Set<JmsListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                    (MethodIntrospector.MetadataLookup<Set<JmsListener>>) method -> {
                        Set<JmsListener> listenerMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
                                method, JmsListener.class, JmsListeners.class);
                        return (!listenerMethods.isEmpty() ? listenerMethods : null);
                    });
            if (annotatedMethods.isEmpty()) {
                this.nonAnnotatedClasses.add(bean.getClass());
                if (logger.isTraceEnabled()) {
                    logger.trace("No @JmsListener annotations found on bean type: " + bean.getClass());
                }
            }
            else {
                // Non-empty set of methods
                annotatedMethods.forEach((method, listeners) ->
                        listeners.forEach(listener ->
                                processJmsListener(listener, method, bean)));
                if (logger.isDebugEnabled()) {
                    logger.debug(annotatedMethods.size() + " @JmsListener methods processed on bean '" + beanName +
                            "': " + annotatedMethods);
                }
            }
        }
        return bean;
    }

加上断点之后可以看到:

image

SpringBoot根据注解找到了该方法,然后与方法绑定。来看看具体怎么绑定的:


    protected void processJmsListener(JmsListener jmsListener, Method mostSpecificMethod, Object bean) {
        Method invocableMethod = AopUtils.selectInvocableMethod(mostSpecificMethod, bean.getClass());

        MethodJmsListenerEndpoint endpoint = createMethodJmsListenerEndpoint();
        endpoint.setBean(bean);
        endpoint.setMethod(invocableMethod);
        endpoint.setMostSpecificMethod(mostSpecificMethod);
        endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
        endpoint.setEmbeddedValueResolver(this.embeddedValueResolver);
        endpoint.setBeanFactory(this.beanFactory);
        endpoint.setId(getEndpointId(jmsListener));
        endpoint.setDestination(resolve(jmsListener.destination()));
        if (StringUtils.hasText(jmsListener.selector())) {
            endpoint.setSelector(resolve(jmsListener.selector()));
        }
        if (StringUtils.hasText(jmsListener.subscription())) {
            endpoint.setSubscription(resolve(jmsListener.subscription()));
        }
        if (StringUtils.hasText(jmsListener.concurrency())) {
            endpoint.setConcurrency(resolve(jmsListener.concurrency()));
        }

        JmsListenerContainerFactory<?> factory = null;
        String containerFactoryBeanName = resolve(jmsListener.containerFactory());
        if (StringUtils.hasText(containerFactoryBeanName)) {
            Assert.state(this.beanFactory != null"BeanFactory must be set to obtain container factory by bean name");
            try {
                factory = this.beanFactory.getBean(containerFactoryBeanName, JmsListenerContainerFactory.class);
            }
            catch (NoSuchBeanDefinitionException ex) {
                throw new BeanInitializationException("Could not register JMS listener endpoint on [" +
                        mostSpecificMethod + "], no " + JmsListenerContainerFactory.class.getSimpleName() +
                        " with id '" + containerFactoryBeanName + "' was found in the application context", ex);
            }
        }

        this.registrar.registerEndpoint(endpoint, factory);
    }

新建了一个终端,设置基本的属性,然后解析@JmsListener注解,设置相关属性,最后将终端注册到JmsListenerContainerFactory。


事例代码地址:https://github.com/KAMIJYOUDOUMA/spring-boot-samples


本篇到此结束,如果读完觉得有收获的话,欢迎点赞、关注、加公众号【贰级天災】,查阅更多精彩历史!!!




文章转载自贰级天災,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论