暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Java+RabbitMQ+Apollo配置中心

静思笃行的蜗牛 2021-11-15
1209

此文内容若含有机密信息,请不要擅自使用相关信息或转发给任何人,不然需要负法律责任,并请告知相关人及时删除并修改机密信息,谢谢。

之前写过.NET的C#版本的RabbitMQ的使用,如下,大家也可以看看

RabbitMQ-(1)几种模式


下面只讲解部分功能的使用,几种模式的使用步骤差不多。


官网

https://www.rabbitmq.com/getstarted.html

RabbitMQ模式有如下几种:


目前大家常用且熟知的可能是前五种,即简单模式、Work模式、发布/订阅模式、路由模型、Topic模式(通配符模式)


我们新建一个Springboot项目,pom.xml文件引入以下包


 <dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>


<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>


<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-client</artifactId>
<version>1.7.0</version>
</dependency>
</dependencies>

配置文件设置应用端口

server:
port: 8018

目录结构如下:


1-Work模式


(1)在config目录下新建RabbitmqConfig文件,代码如下

@SpringBootConfiguration
public class RabbitmqConfig {


//region 配置一个工作模型队列
@Bean
    public Queue queueWork() {
return new Queue("work_queue");
}


(2)在listener目录下新建WorkReceiveListener文件,内容如下

@Component
public class WorkReceiveListener {
@RabbitListener(queues = "work_queue")
public void receiveMessage(String msg, Channel channel, Message message) {
// 只包含发送的消息
        System.out.println("work模式接收者1接收到消息:" + msg);
// channel 通道信息
// message 附加的参数信息
}


@RabbitListener(queues = "work_queue")
public void receiveMessage2(Object obj, Channel channel, Message message) {
// 包含所有的信息
        System.out.println("work模式接收者2接收到消息:" + obj);
}
}



(3)在mapper下新建RabbitmqMapper文件,内容如下:

@Component
public class RabbitmqMapper {
@Autowired
private RabbitTemplate rabbitTemplate;


public void sendWork() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("work_queue", "测试work: " + i);
}
}


(4)新建controller层新建文件RabbitmqController

@RestController
public class RabbitmqController {
@Autowired
private RabbitmqService rabbitmqService;
//http://127.0.0.1:8018/sendWork
@RequestMapping("/sendWork")
public Object sendWork() {
rabbitmqService.sendWork();
return "发送成功...";
}


(5)在service新增

public interface RabbitmqService {
void sendWork();
}


@Service
public class RabbitmqServiceImpl implements RabbitmqService {
@Autowired
private RabbitmqMapper rabbitmqMapper;


@Override
public void sendWork() {
rabbitmqMapper.sendWork();
}
}


(6)在配置文件中加入Rabbitmq配置

# 应用名
spring.application.name=springboot-rabbitmq
rabbitmq配置信息
ip
spring.rabbitmq.host=172.17.17.17
# 端口
spring.rabbitmq.port=5672
# 用户名
spring.rabbitmq.username=admin
# 密码
spring.rabbitmq.password=admin
# 配置虚拟机
spring.rabbitmq.virtual-host=/
# 消息开启手动确认
spring.rabbitmq.listener.direct.acknowledge-mode=manual


访问接口

http://localhost:8018/sendWork


其他模式新建的方法步骤都类似,如下


2-发布/订阅模式



(1)在config目录下新建RabbitmqConfig文件,代码如下

//region  发布订阅模式
// 声明两个队列
@Bean
public Queue queueFanout1() {
return new Queue("fanout_queue_1");
}
@Bean
public Queue queueFanout2() {
return new Queue("fanout_queue_2");
}
// 准备一个交换机
@Bean
public FanoutExchange exchangeFanout() {
return new FanoutExchange("fanout_exchange");
}
// 将交换机和队列进行绑定
@Bean
public Binding bindingExchange1() {
return BindingBuilder.bind(queueFanout1()).to(exchangeFanout());
}
@Bean
public Binding bindingExchange2() {
return BindingBuilder.bind(queueFanout2()).to(exchangeFanout());
}
//endregion


(2)在listener目录下新建PublishReceiveListener文件,内容如下

@Component
public class PublishReceiveListener {


@RabbitListener(queues = "fanout_queue_1")
public void receiveMsg1(String msg) {
System.out.println("fanout队列1接收到消息:" + msg);
}


@RabbitListener(queues = "fanout_queue_2")
public void receiveMsg2(String msg) {
System.out.println("fanout队列2接收到消息:" + msg);
}
}



(3)在mapper下新建RabbitmqMapper文件,内容如下:



// 向发布订阅模式里面发送消息
public void sendPublish() {
for (int i = 0; i < 5; i++) {
// rabbitTemplate.convertSendAndReceive("exchange_fanout", "", "测试发布订阅:" + i);
rabbitTemplate.convertAndSend("fanout_exchange", "", "测试发布订阅:" + i);
}
}


(4)新建controller层新建文件RabbitmqController



//http://127.0.0.1:8018/sendPublish
@RequestMapping("/sendPublish")
public String sendPublish() {
rabbitmqService.sendPublish();
return "发送成功...";
}



(5)在service新增

 void sendPublish();


  @Override
public void sendPublish() {
rabbitmqMapper.sendPublish();
}


访问接口

http://localhost:8018/sendPublish


3-Topic模式


(1)在config目录下新建RabbitmqConfig文件,代码如下

  //region topic 模型
@Bean
public Queue queueTopic1() {
return new Queue("topic_queue_1");
}
@Bean
public Queue queueTopic2() {
return new Queue("topic_queue_2");
}
@Bean
public TopicExchange exchangeTopic() {
return new TopicExchange("topic_exchange");
}
@Bean
public Binding bindingTopic1() {
return BindingBuilder.bind(queueTopic1()).to(exchangeTopic()).with("topic.#");
}
@Bean
public Binding bindingTopic2() {
return BindingBuilder.bind(queueTopic2()).to(exchangeTopic()).with("topic.*");
}
//endregion


(2)在listener目录下新建TopicReceiveListener文件,内容如下

@Component
public class TopicReceiveListener {


@RabbitListener(queues = "topic_queue_1")
public void receiveMsg1(String msg) {
System.out.println("topic消费者1接收到:" + msg);
}


@RabbitListener(queues = "topic_queue_2")
public void receiveMsg2(String msg) {
System.out.println("topic消费者2接收到:" + msg);
}
}



(3)在mapper下新建RabbitmqMapper文件,内容如下:





// 向topic发送数据
public void sendTopic() {
for (int i = 0; i < 10; i++) {
if (i % 2 == 0) {
rabbitTemplate.convertSendAndReceive("topic_exchange", "topic.route.topic", "测试发布订阅:" + i);
} else {
rabbitTemplate.convertSendAndReceive("topic_exchange", "topic.route", "测试发布订阅:" + i);


}
}
}


(4)新建controller层新建文件RabbitmqController



@RequestMapping("/sendTopic")
public String sendTopic() {
rabbitmqService.sendTopic();
return "发送成功...";
}



(5)在service新增



void sendTopic();


   @Override
public void sendTopic() {
rabbitmqMapper.sendTopic();
}


访问接口

http://localhost:8018/sendTopic


4-延迟队列

通常,延迟队列是有时间限制的,当到期后,通常会将消息放到另一个队列中。


(1)在config目录下新建RabbitmqConfig文件,代码如下

  //region TTL 队列  死信队列
@Value("${ttl_direct_product.user.exchange}")
private String userExchange;


@Value("${ttl_direct_product.user.queue}")
private String userQueue;


@Value("${ttl_direct_product.user.routingKey}")
private String userRoutingKey;


@Value("${ttl_direct_product.dlx.exchange}")
private String dlxExchange;


@Value("${ttl_direct_product.dlx.queue}")
private String dlxQueue;


@Value("${ttl_direct_product.dlx.routingKey}")
private String dlxRoutingKey;




/**
* 声明死信队列
* @return DirectExchange
*/
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange(dlxExchange);
}


/**
* 声明死信队列
* @return Queue
*/
@Bean
public Queue dlxQueue() {
return new Queue(dlxQueue);
}


/**
* 绑定死信队列到死信交换机
* @return Binding
*/
@Bean
public Binding binding() {
return BindingBuilder.bind(dlxQueue())
.to(dlxExchange())
.with(dlxRoutingKey);
}


/**
* 声明订单业务交换机
* @return DirectExchange
*/
@Bean
public DirectExchange userExchange() {
return new DirectExchange(userExchange);
}


/**
* 声明订单业务队列
* @return Queue
*/
@Bean
public Queue userQueue() {
Map<String,Object> arguments = new HashMap<>(2);
// 绑定该队列到私信交换机
arguments.put("x-dead-letter-exchange",dlxExchange);
arguments.put("x-dead-letter-routing-key",dlxRoutingKey);
return new Queue(userQueue,true,false,false,arguments);
}


/**
* 绑定订单队列到订单交换机
* @return Binding
*/
@Bean
public Binding userBinding() {
return BindingBuilder.bind(userQueue())
.to(userExchange())
.with(userRoutingKey);


}
//endregion


(2)在listener目录下新建DirectTTLReceiveListener文件,内容如下

@Component
public class DirectTTLReceiveListener {


@RabbitListener(queues = "ttl_direct_product.dlx.queue")
public void receiveMsg1(String msg) {
System.out.println("ttl_direct_product.dlx.queue接收到消息:" + msg);
}




}





(3)在mapper下新建RabbitmqMapper文件,内容如下:





// 向发布订阅模式里面发送消息
public void sendDirectTTL() {
for (int i = 0; i < 5; i++) {
// rabbitTemplate.convertSendAndReceive("exchange_fanout", "", "测试发布订阅:" + i);
rabbitTemplate.convertAndSend("ttl_direct_product.user.exchange", "ttl_direct_product.user.routingKey",
"测试死信队列消息"+i, message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setExpiration("10000");
return message;
});
}
}


(4)新建controller层新建文件RabbitmqController





@RequestMapping("/sendDirectTTL")
public String sendDirectTTL() {
rabbitmqService.sendDirectTTL();
return "发送成功...";
}



(5)在service新增




void sendDirectTTL();




@Override
public void sendDirectTTL() {
rabbitmqMapper.sendDirectTTL();
}


访问接口

http://localhost:8018/sendDirectTTL


5-将RabbitMQ配置放到Apollo配置中心

Apollo的安装这里就不说明了,直接讲解Apollo的配置

(1)新建一个应用

(2)填写如下内容,这里主要注意的是Appid的名字,会与应用代码中的配置有关联,命名尽量正常点,不要另类,我这里名字叫springboot-apollo-demo6


(3)

(4)填写以下内容到输入框中


spring.rabbitmq.host = 172.17.17.17
spring.rabbitmq.port = 5672
spring.rabbitmq.username = admin
spring.rabbitmq.password = admin
spring.rabbitmq.virtual-host = /
spring.rabbitmq.listener.direct.acknowledge-mode = manual

勾选确定


(5)进入上面的应用代码中

修改配置文件如下

新建一个配置 文件,名称为application-dev.yml,内容如下

server:
port: 8018
application:
name: springboot-rabbitmq

app:
id: springboot-apollo-demo6
apollo:
meta: http://172.17.17.47:8080
namespace: application


(6)再在启动程序中添加如下注入


这时,我们的RabbitMQ的配置就是从配置中心读取了



更多分享请关注我的公众号



文章转载自静思笃行的蜗牛,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论