上一片对SpringBoot做了一个简单的介绍,从本篇开始将通过整合其它组件分析SpringBoot源码。
如何使用ActiveMQ
1、引用spring-boot-starter-activemq
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
2、添加配置
spring.activemq.in-memory=true
spring.activemq.pool.enabled=false
3、SampleActiveMQApplication.java
@SpringBootApplication
@EnableJms
public class SampleActiveMQApplication {
// 贰级天災
@Bean
public Queue queue() {
return new ActiveMQQueue("sample.queue");
}
public static void main(String[] args) {
SpringApplication.run(SampleActiveMQApplication.class, args);
}
}
4、Consumer.java
@Component
public class Consumer {
// 贰级天災
@JmsListener(destination = "sample.queue")
public void receiveQueue(String text) {
System.out.println(text);
}
}
5、Producer.java
@Component
public class Producer implements CommandLineRunner {
// 贰级天災
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
@Override
public void run(String... args) throws Exception {
send("Sample message");
System.out.println("Message was sent to the Queue");
}
public void send(String msg) {
this.jmsMessagingTemplate.convertAndSend(this.queue, msg);
}
}
实现CommandLineRunner接口主要是让项目启动后执行run方法。
以上几个类创建完成后就可以使用ActiveMQ了。
可以看到,SampleActiveMQApplication中注入了一个队列,Consumer中通过@JmsListener注解使得receiveQueue方法监听该队列。Producer中使用jmsMessagingTemplate给该队列发送消息。
为什么这些代码可以整合ActiveMQ?为什么JmsMessagingTemplate可以直接注入使用?
这里我们主要看一个类:JmsAutoConfiguration
@Configuration
@ConditionalOnClass({ Message.class, JmsTemplate.class })
@ConditionalOnBean(ConnectionFactory.class)
@EnableConfigurationProperties(JmsProperties.class)
@Import(JmsAnnotationDrivenConfiguration.class)
public class JmsAutoConfiguration {
// 贰级天災
@Configuration
protected static class JmsTemplateConfiguration {
private final JmsProperties properties;
private final ObjectProvider<DestinationResolver> destinationResolver;
private final ObjectProvider<MessageConverter> messageConverter;
public JmsTemplateConfiguration(JmsProperties properties,
ObjectProvider<DestinationResolver> destinationResolver,
ObjectProvider<MessageConverter> messageConverter) {
this.properties = properties;
this.destinationResolver = destinationResolver;
this.messageConverter = messageConverter;
}
@Bean
@ConditionalOnMissingBean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
PropertyMapper map = PropertyMapper.get();
JmsTemplate template = new JmsTemplate(connectionFactory);
template.setPubSubDomain(this.properties.isPubSubDomain());
map.from(this.destinationResolver::getIfUnique).whenNonNull()
.to(template::setDestinationResolver);
map.from(this.messageConverter::getIfUnique).whenNonNull()
.to(template::setMessageConverter);
mapTemplateProperties(this.properties.getTemplate(), template);
return template;
}
private void mapTemplateProperties(Template properties, JmsTemplate template) {
PropertyMapper map = PropertyMapper.get();
map.from(properties::getDefaultDestination).whenNonNull()
.to(template::setDefaultDestinationName);
map.from(properties::getDeliveryDelay).whenNonNull().as(Duration::toMillis)
.to(template::setDeliveryDelay);
map.from(properties::determineQosEnabled).to(template::setExplicitQosEnabled);
map.from(properties::getDeliveryMode).whenNonNull().as(DeliveryMode::getValue)
.to(template::setDeliveryMode);
map.from(properties::getPriority).whenNonNull().to(template::setPriority);
map.from(properties::getTimeToLive).whenNonNull().as(Duration::toMillis)
.to(template::setTimeToLive);
map.from(properties::getReceiveTimeout).whenNonNull().as(Duration::toMillis)
.to(template::setReceiveTimeout);
}
}
@Configuration
@ConditionalOnClass(JmsMessagingTemplate.class)
@Import(JmsTemplateConfiguration.class)
protected static class MessagingTemplateConfiguration {
@Bean
@ConditionalOnMissingBean
@ConditionalOnSingleCandidate(JmsTemplate.class)
public JmsMessagingTemplate jmsMessagingTemplate(JmsTemplate jmsTemplate) {
return new JmsMessagingTemplate(jmsTemplate);
}
}
}
这个类帮忙注入了很多ActiveMQ要使用到的对象,其中就包括jmsMessagingTemplate。
再看看@JmsListener(destination = "sample.queue")为什么可以指定方法为消息接收方。看一下JmsListenerAnnotationBeanPostProcessor的postProcessAfterInitialization方法:
public Object postProcessAfterInitialization(final Object bean, String beanName) throws BeansException {
if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
Map<Method, Set<JmsListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<JmsListener>>) method -> {
Set<JmsListener> listenerMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
method, JmsListener.class, JmsListeners.class);
return (!listenerMethods.isEmpty() ? listenerMethods : null);
});
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(bean.getClass());
if (logger.isTraceEnabled()) {
logger.trace("No @JmsListener annotations found on bean type: " + bean.getClass());
}
}
else {
// Non-empty set of methods
annotatedMethods.forEach((method, listeners) ->
listeners.forEach(listener ->
processJmsListener(listener, method, bean)));
if (logger.isDebugEnabled()) {
logger.debug(annotatedMethods.size() + " @JmsListener methods processed on bean '" + beanName +
"': " + annotatedMethods);
}
}
}
return bean;
}
加上断点之后可以看到:

SpringBoot根据注解找到了该方法,然后与方法绑定。来看看具体怎么绑定的:
protected void processJmsListener(JmsListener jmsListener, Method mostSpecificMethod, Object bean) {
Method invocableMethod = AopUtils.selectInvocableMethod(mostSpecificMethod, bean.getClass());
MethodJmsListenerEndpoint endpoint = createMethodJmsListenerEndpoint();
endpoint.setBean(bean);
endpoint.setMethod(invocableMethod);
endpoint.setMostSpecificMethod(mostSpecificMethod);
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
endpoint.setEmbeddedValueResolver(this.embeddedValueResolver);
endpoint.setBeanFactory(this.beanFactory);
endpoint.setId(getEndpointId(jmsListener));
endpoint.setDestination(resolve(jmsListener.destination()));
if (StringUtils.hasText(jmsListener.selector())) {
endpoint.setSelector(resolve(jmsListener.selector()));
}
if (StringUtils.hasText(jmsListener.subscription())) {
endpoint.setSubscription(resolve(jmsListener.subscription()));
}
if (StringUtils.hasText(jmsListener.concurrency())) {
endpoint.setConcurrency(resolve(jmsListener.concurrency()));
}
JmsListenerContainerFactory<?> factory = null;
String containerFactoryBeanName = resolve(jmsListener.containerFactory());
if (StringUtils.hasText(containerFactoryBeanName)) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
try {
factory = this.beanFactory.getBean(containerFactoryBeanName, JmsListenerContainerFactory.class);
}
catch (NoSuchBeanDefinitionException ex) {
throw new BeanInitializationException("Could not register JMS listener endpoint on [" +
mostSpecificMethod + "], no " + JmsListenerContainerFactory.class.getSimpleName() +
" with id '" + containerFactoryBeanName + "' was found in the application context", ex);
}
}
this.registrar.registerEndpoint(endpoint, factory);
}
新建了一个终端,设置基本的属性,然后解析@JmsListener注解,设置相关属性,最后将终端注册到JmsListenerContainerFactory。
事例代码地址:https://github.com/KAMIJYOUDOUMA/spring-boot-samples
本篇到此结束,如果读完觉得有收获的话,欢迎点赞、关注、加公众号【贰级天災】,查阅更多精彩历史!!!





