在中大型项目开发过程中,我们往往会使用消息中间件来对各类消息进行路由与管理,已达到模块解耦的目的,降低项目复杂性,阿里巴巴在ActiveMQ的基础上,开源了RocketMQ,提供了更符合实际项目需要的各类功能包括:包括同步消息,异步消息,单向传输消息,顺序消费,定时消息,分布式事务消息等,阿里用它处理着每天几千亿的消息路由,用在包括订单,交易,充值,流计算,消息推送,binlog分发等应用场景。
概述
RocketMQ
是一个分布式消息中间件,并支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。它里面有几个区别于标准消息中件间的概念,如Group、Topic、Queue等。系统组成则由Producer、Consumer、Broker、NameServer等,架构图如下:

Name Server 负责维护一些全局的路由信息:当前有哪些broker,每个Topic在哪个broker上; Broker具体处理消息的存储和服务;生产者和消费者是消息的源头和归宿。
性能上,RocketMQ
单机TPS约7w/s,部署3个Broker,可以最高跑到12w/s,消息大小10个字节,RocketMQ单机最高支持5万个队列,Load不会发生明显变化,RocketMQ
采用长轮询,同Push实时性一致,消息投递延迟在几个毫秒级别,消息失败支持定时重试,每次重试间隔时间顺延递增,失败重试的消息不会因为consumer宕机而丢失;
相比其他消息中间件,RocketMQ带来以下特性:
消息查询:指定Message Key或Message Id查询消息;
分布式事务支持;
消息回溯:支持从某个时刻重新消费消息;
定时消息;
消息顺序:支持FIFO的顺序消息消费,一台Broker宕机后,发送消息失败,不会引起乱序;
消息重试;
消息实时投递;
消息轨迹;
MQ消息监控控制台:rocketmq-console
支持丰富的消息过滤机制:通过Tags过滤,通过Java Class做任意形式的过滤,也可以做Message Body的过滤拆分;
消息堆积处理:支持亿级消息堆积;
高效的订阅者水平扩展能力,丰富的拉取消息模式;
安装
安装环境
64bit OS, Linux/Unix/Mac is recommended;
64bit JDK 1.8+;
Maven 3.2.x
Git
源码编译
1> unzip rocketmq-all-4.3.0-source-release.zip
2> cd rocketmq-all-4.3.0/
3> mvn -Prelease-all -DskipTests clean install -U
4> cd distribution/target/apache-rocketmq
启停服务
1> nohup sh bin/mqnamesrv &
2> tail -f ~/logs/rocketmqlogs/namesrv.log
3The Name Server boot success...
4> nohup sh bin/mqbroker -n localhost:9876 &
5> tail -f ~/logs/rocketmqlogs/broker.log
6The broker[%s, 172.30.30.233:10911] boot success...
7// 关闭服务
8> sh bin/mqshutdown broker
9The mqbroker(36695) is running...
10Send shutdown request to mqbroker(36695) OK
11> sh bin/mqshutdown namesrv
12The mqnamesrv(36664) is running...
13Send shutdown request to mqnamesrv(36664) OK
命令行发送消费消息
1 > export NAMESRV_ADDR=localhost:9876
2 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
3 SendResult [sendStatus=SEND_OK, msgId= ...
4 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
5 ConsumeMessageThread_%d Receive New Messages: [MessageExt...
消息队列控制台
除了消息队列控制台,其他更多的RocketMQ
扩展可以从https://github.com/apache/rocketmq-externals中找到
运维
你可以修改这个服务使用的navesvr的地址
你可以修改这个服务是否使用VIPChannel(如果你的mq server版本小于3.5.8,请设置不使用)
驾驶舱
查看broker的消息量(总量/5分钟图)
查看单一主题的消息量(总量/趋势图)
集群页面
查看集群的分布情况:cluster与broker关系、broker
查看broker具体信息/运行信息
查看broker配置信息
Topic
提供了Topic管理,筛选功能,Topic状态管理,路由信息,发送消息等功能;
Consumer
提供了消费者管理,筛选功能
发布管理
消息查询
根据Topic和时区查询,最多显示2000条;根据Topic和Key搜索;根据消息主题和消息ID查询等;

开发与使用
先定义生产者和消费者两个普通类,以此展开:
1public class RocketProducer {
2 private DefaultMQProducer producer = null;
3 private String groupName;
4 public RocketProducer(String host,String groupName) {
5 this.groupName = groupName;
6 producer = new DefaultMQProducer(groupName);
7 producer.setNamesrvAddr(host);
8 }
9 public void start() {
10 try {
11 producer.start();
12 } catch (MQClientException e) {
13 e.printStackTrace();
14 }
15 }
16 public void startAsync() {
17 try {
18 producer.start();
19 producer.setRetryTimesWhenSendAsyncFailed(0);
20 } catch (MQClientException e) {
21 e.printStackTrace();
22 }
23 }
24 public void shutdown() {
25 producer.shutdown();
26 }
27 public SendResult sendMessage(String topic,Object message) {
28 return sendMessage(topic,"",message);
29 }
30 public SendResult sendMessage(String topic,String tags,Object message) {
31 try {
32 Message msg = new Message(topic,
33 tags,
34 (JSON.toJSONString(message)).getBytes(RemotingHelper.DEFAULT_CHARSET)
35 );
36 SendResult result = producer.send(msg);
37 return result;
38 }catch(Exception ex) {
39 ex.printStackTrace();
40 }
41 return null;
42 }
43
44 public void sendMessageAsync(String topic,String tags,String order,Object message,SendCallback callback) {
45 try {
46 Message msg = new Message(topic,
47 tags,order,
48 (JSON.toJSONString(message)).getBytes(RemotingHelper.DEFAULT_CHARSET));
49 producer.send(msg, callback);
50 }catch(Exception ex) {
51 ex.printStackTrace();
52 }
53 }
54
55 public void sendMessageAsync(String topic,String tags,Object message,SendCallback callback) {
56 try {
57 Message msg = new Message(topic,
58 tags,
59 (JSON.toJSONString(message)).getBytes(RemotingHelper.DEFAULT_CHARSET));
60 producer.send(msg, callback);
61 }catch(Exception ex) {
62 ex.printStackTrace();
63 }
64 }
65
66 public void sendMessageAsync(String topic,Object message,SendCallback callback) {
67 sendMessageAsync(topic,"",message,callback);
68 }
69
70 public void sendMessageOneway(String topic,String tags,Object message) {
71 try {
72 Message msg = new Message(topic,
73 tags,
74 (JSON.toJSONString(message)).getBytes(RemotingHelper.DEFAULT_CHARSET));
75 producer.sendOneway(msg);
76 }catch(Exception ex) {
77 ex.printStackTrace();
78 }
79 }
80}
81public class RocketConsumer {
82 private String groupName;
83 private DefaultMQPushConsumer consumer;
84 public RocketConsumer(String host,String groupName) {
85 this.groupName = groupName;
86 consumer = new DefaultMQPushConsumer(groupName);
87 consumer.setNamesrvAddr(host);
88 }
89
90 public void subscribe(String topicName,MessageListenerConcurrently msgListener) {
91 try {
92 consumer.subscribe(topicName, "*");
93 consumer.registerMessageListener(msgListener);
94 consumer.start();
95 }catch(Exception ex) {
96 ex.printStackTrace();
97 }
98 }
99
100 public void broadcast(String topicName,MessageListenerConcurrently msgListener) {
101 try {
102 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
103 consumer.setMessageModel(MessageModel.BROADCASTING);
104 consumer.subscribe(topicName, "*");
105 consumer.registerMessageListener(msgListener);
106 consumer.start();
107 }catch(Exception ex) {
108 ex.printStackTrace();
109 }
110 }
111}
消费消息(含tags过滤与广播)


如果消息消费不成功,可以设置RECONSUME_LATER
,如果消息有顺序,需设置Key;
消费消息:复杂过滤条件

log4j直接输出到RocketMQ也是可以的,只需配置log4j配置文件即可;
分布式事务控制:必须实现
TransactionListener
,建立分布式事务最终一致性;
云平台应用场景
推送消息有序,并且消息不丢失
典型应用场景是数据变化通知,云平台为全公司提供基础数据变动推送服务,通过canal
侦听基础库的数据变化,将哪个表哪条记录发生变化通过到第三方URL,这些消息必须严格保证有序,否则就会引起数据严重的错乱问题,比如原推送是先删除A表记录,再删除A表,如果推送给用户收到的消息是先删除A表,再删除A表记录,那么就会引起问题,所以这里推送给第三URL的消息必须保证严格的顺序,并且消息不允许丢失,如下我们的Demo程序中,推送的10条消息到客户端是完全有序的,并且消息不丢失,即便中间第三方URL中断无法访问也不会丢失:

推送消息无序,并且消息不丢失
比如行情数据的推送,自带时间点对应哪个数据点,至于A时刻A1值先到,还是B时刻B1值先到,都不影响最终数据结果,所以这类应用最为常见,也是我们默认的推送方式,在我们的Demo程序中,消息乱序但消息不丢失,我们发送的10条消息客户端Websocket收到可能顺序是这样的:

推送消息丢失
发送一个信号即便当前URL不可用也是可忽略的推送消息,我们使用该场景,如下云平台推送了10条消息,但在客户端只收到了2条,因为期间第三方URL有中断或返回失败值的情况:

感谢阅读!
你可以继续阅读:对微服务的理解以及实现一套微服务对外发布API管理平台 | 项目开发中常用的设计模式整理 | 异构语言调用平台的设计与实现 | 大话正则表达式 | 云API平台的设计与实现 | 个税改了,工资少了,不要慌!文末附计算器
关注我们的公众号
长按识别二维码关注我们





