基本名词
connectionFactory:连接工厂(类似连接池)
connection:连接
channel:通道
exchange:交换机
queue:队列
基本步骤
1.初始化连接工厂(connectionFactory)
2.从连接工厂中获取连接(connection)
3.在连接中打开通道(channel)
4.在通道中声明交换机(exchange)
5.在通道中声明队列(一个或多个)
6.绑定队列到交换机
7.进行相关操作
工作原理
生产者--消费者模式:基本的模式是生产者生成消息,投入到列队中,需要的消费者(订阅)去列队拿消息进行处理。
rabbitmq在这里进行了一个中间处理,消息投递给哪个列队,由路由器或者称交换机(Exchange)来处理,即生产者不直接将消息投递到列队中,而是投递到交换机,具体要投递到哪个列队,由交换机根据路由规则来确定。类似于寄信,寄送人不直接将信件投递到派件员手中,而是投递到邮局,邮局根据信件的地址来决定到送到哪位派件员手上,进而送达收件人。
因此,rabbitmq的工作模式就是producer将消息投递到特定的exchange,queue按routing key订阅消息(例如,列队A订阅了交换机E的路由键为test的消息,那么一个生产者投递两个消息到E中两个消息,routing key分别为test和test2,那么A列队的只能收到路由键为test的消息),符合路由键的消息将被分发到具体的列队中。
交换机路由方式
交换机常用的路由方式有四种,fanout、direct、topic、header。header用的较少fanout是简单地将消息分发给所有列队
direct是根据路由键直接分发
topic可以说是是direct的扩展,引入了匹配模式

连接工厂配置
/**
* Created by syd on 2020/02/04.
*/
@Slf4j
@Configuration
public class ManagerRabbitMqConfig {
/**
* mq host
*/
@Value("${spring.rabbitmq.host}")
private String host;
/**
* 端口
*/
@Value("${spring.rabbitmq.port}")
private int port;
/**
* 用户名
*/
@Value("${spring.rabbitmq.username}")
private String username;
/**
* 密码
*/
@Value("${spring.rabbitmq.password}")
private String password;
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
@Bean
public MsgSendConfirmCallBack msgSendConfirmCallBack() {
return new MsgSendConfirmCallBack();
}
@Bean
public MsgSendReturnCallback msgSendReturnCallback() {
return new MsgSendReturnCallback();
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate() {
CachingConnectionFactory connectionFactory = connectionFactory();
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
/**若使用confirm-callback或return-callback,
* 必须要配置publisherConfirms或publisherReturns为true
* 每个rabbitTemplate只能有一个confirm-callback和return-callback
*/
rabbitTemplate.setConfirmCallback(msgSendConfirmCallBack());
rabbitTemplate.setReturnCallback(msgSendReturnCallback());
/**
* 消息发送失败返回到队列中
* 使用return-callback时必须设置mandatory为true,或者在配置中设置mandatory-expression的值为true,
* 可针对每次请求的消息去确定’mandatory’的boolean值,
* 只能在提供’return-callback’时使用,与mandatory互斥
*/
rabbitTemplate.setMandatory(true);
return rabbitTemplate;
}
/**
* 定义新增客户经理队列
*
* @return
*/
@Bean
public Queue saveManagerQueue() {
return new Queue(ManagerRabbitConfig.BANK_MANAGER_SAVE_QUEUE, true, false, false);
}
/**
* 定义客户经理交换机 (type=广播方式)
*
* @return
*/
@Bean
public FanoutExchange saveManagerFanoutExchange() {
FanoutExchange directExchange = new FanoutExchange(ManagerRabbitConfig.BANK_MANAGER_SAVE_EXCHANGE, true, false);
return directExchange;
}
/**
* 队列和交换机绑定
*
* @return
*/
@Bean
public Binding saveManagerExchangeQueueBinding() {
return BindingBuilder.bind(saveManagerQueue()).to(saveManagerFanoutExchange());
}
消费者
/**
* 新增客户经理消费者
* Created by syd on 2020/02/04.
*/
@Slf4j
@Component
public class SaveManagerConsumer {
//监听队列的名称
@RabbitListener(queues = {ManagerRabbitConfig.BANK_MANAGER_SAVE_QUEUE})
public void receiveData(String request, Message message, Channel channel) {
log.info("客户经理信息:{}", request);
MqManagerMessage managerMessage = JacksonUtil.jsonToBean(request, MqManagerMessage.class);
try {
//todo
} catch (Exception e) {
log.error("{}", e.getMessage());
}
}
}
文章转载自沃了个去,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




