
本文首发于Ressmix个人站点:https://www.tpvlog.com
本章是分布式消息中间件(进阶篇)的第一篇,我将开始对RocketMQ的源码进行分析,帮助大家深入理解RockectMQ的整个架构及核心功能的原理。阅读源码,最核心的一点是:理清系统架构、模块划分、模块关系,然后按照开源框架的工作流程从核心组件开始阅读,在阅读源码的过程中一定要“不求甚解”,抛开细枝末节,抓住核心主干。
一、导入源码
我们先来搭建RocketMQ的源码环境。RocketMQ的开源代码托管在GitHub上:https://github.com/apache/rocketmq,我们clone一份到自己的本地机器:

由于项目采用Maven管理,所以我们可以在根目录执行以下命令进行编译打包:
1mvn clean install -Dmaven.test.skip=true
然后,我们将Maven项目通过IDE(比如Idea)导入:

可以看到,RocketMQ源码的模块结构如下:
broker:存放RocketMQ的Broker相关的代码,这里的代码可以用来启动Broker进程;
client:存放RocketMQ的Producer、Consumer这些客户端的代码,生产消息、消费消息的代码都在里面;
common:存放公共代码;
dev:存放开发相关的一些信息;
distribution:存放用来部署RocketMQ的一些东西,比如bin目录 、conf目录等等;
example:存放RocketMQ的一些例子;
filter:存放RocketMQ的与过滤器相关的代码;
logappender和logging:存放RocketMQ的日志打印相关的东西;
namesrv:存放NameServer的源码;
openmessaging:这是开放消息标准,先忽略;
remoting:存放RocketMQ的远程网络通信模块的代码,基于netty实现;
srvutil:存放一些工具类;
store:存放Broker上存储相关的一些源码;
style、test、tools:存放checkstyle代码检查的东西,一些测试相关的类,还有就是tools里放的一些命令行监控工具类。
二、启动工程
接下来,我们来看下如何启动RocketMQ工程。在使用RocketMQ时,肯定是先启动NameServer,因为它是RocketMQ的消息路由中心;接着,肯定是启动Broker,它负责接收消息和消息的存储;最后才是启动Producer和Consumer。
2.1 启动NameServer
要启动NameServer,所以我们先在RocketMQ源码目录中找到namesrv
这个工程,然后找到NamesrvStartup.java
启动类:

NameServer启动时,会去寻找一个名为ROCKETMQ_HOME
的环境变量,这个环境变量的值是我们本机上的RocketMQ的运行时根目录。所以,我们需要先建立一个ROCKETMQ_HOME
目录,例如,我在自己本机上建立了如下根目录:

注意,我们需要在ROCKETMQ_HOME
根目录下创建conf、logs、store三个文件夹,因为后续NameServer运行是需要使用这些目录的。然后我们把RocketMQ源码目录中的distrbution
模块下的broker.conf、logback_namesrv.xml两个配置文件拷贝到上述的conf目录中去,接着就需要修改这两个配置文件的内容:
logback_namesrv.xml:修改里面的所有
1brokerClusterName = DefaultCluster
2brokerName = broker-a
3brokerId = 0
4# 这是nameserver的地址
5namesrvAddr=127.0.0.1:9876
6deleteWhen = 04
7fileReservedTime = 48
8brokerRole = ASYNC_MASTER
9flushDiskType = ASYNC_FLUSH
10
11# 这是存储路径,你设置为你的rocketmq运行目录的store子目录
12storePathRootDir=C:\Users\Ressmix\Desktop\ROCKETMQ\store
13
14# 这是commitLog的存储路径
15storePathCommitLog=C:\Users\Ressmix\Desktop\ROCKETMQ\store/commitlog
16
17# consume queue文件的存储路径
18storePathConsumeQueue=C:\Users\Ressmix\Desktop\ROCKETMQ\store/consumequeue
19
20# 消息索引文件的存储路径
21storePathIndex=C:\Users\Ressmix\Desktop\ROCKETMQ\store/index
22
23# checkpoint文件的存储路径
24storeCheckpoint=C:\Users\Ressmix\Desktop\ROCKETMQ\store/checkpoint
25
26# abort文件的存储路径
27abortFile=C:\Users\Ressmix\Desktop\ROCKETMQ/abort
最后,启动NameServer,注意启动时,要先配置好ROCKETMQ_HOME
环境变量,可以在IDE中进行配置:

NameServer启动时会读取conf
里的配置文件,所有的日志都会打印在logs
目录里,然后数据都会写在store
目录里,启动成功之后,在Intellij IDEA的命令行里就会看到下面的提示:
1Connected to the target VM, address: '127.0.0.1:54473', transport: 'socket'
2The Name Server boot success. serializeType=JSON
2.2 启动Broker
启动Broker的方式和NameServer类似,先在RocketMQ源码目录中找到broker
这个工程,然后找到BrokerStartup.java
启动类:

Broker启动时,需要指定broker.conf
配置,这个就是ROCKETMQ_HOME/conf/broker.conf
,主要是配置了NameServer的地址,然后配置了Broker的数据存储路径:包括commitlog文件、consumequeue文件、index文件、checkpoint文件等等。
除此之外,我们还需要将RocketMQ源码目录中的distrbution
模块下的logback_broker.xml
这个配置文件拷贝到上述的conf目录中去,修改里面的所有${user.home},替换为你的ROCKETMQ_HOME
根目录。
最后,启动Broker,注意要在IDE中进行配置:

启动成功之后,在Intellij IDEA的命令行里就会看到下面的提示:
1Connected to the target VM, address: '127.0.0.1:55275', transport: 'socket'
2The broker[broker-a, 192.168.3.9:10911] boot success. serializeType=JSON
三、 运行示例
NameServer和Broker都启动成功了,接着我们就可以进行消息投递和消费了。RocketMQ源码里的example
模块下提供了生产者和消费者的示例:

3.1 创建Topic
投递/订阅消息肯定要指定消息的Topic,我们先通过RocketMQ的管理控制台创建一个测试用的Topic——TopicTest
。

我在《RocketMQ性能压测》一章讲过如果搭建RocketMQ的管理控制台,可以参阅该章节内容进行搭建。
3.2 生产者示例
接着,我们来修改一下RocketMQ自带的Producer示例程序:
1public class Producer {
2 public static void main(String[] args) throws MQClientException, InterruptedException {
3
4 /*
5 * Instantiate with a producer group name.
6 */
7 DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
8
9 // 设置NameServer地址
10 producer.setNamesrvAddr("127.0.0.1:9876");
11
12
13 /*
14 * Launch the instance.
15 */
16 producer.start();
17
18 for (int i = 0; i < 1; i++) {
19 try {
20
21 /*
22 * Create a message instance, specifying topic, tag and message body.
23 */
24 Message msg = new Message("TopicTest" /* Topic */,
25 "TagA" /* Tag */,
26 ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
27 );
28
29 /*
30 * Call send message to deliver message to one of brokers.
31 */
32 SendResult sendResult = producer.send(msg);
33
34 System.out.printf("%s%n", sendResult);
35 } catch (Exception e) {
36 e.printStackTrace();
37 Thread.sleep(1000);
38 }
39 }
40
41 /*
42 * Shut down once the producer instance is not longer in use.
43 */
44 producer.shutdown();
45 }
46}
我们运行上面的程序就可以了,他会发送1条消息到Broker里去,我们观察一下控制台的日志打印,如果可以看到下面的内容,就说明我们已经成功的把消息发送到Broker里去了:
1SendResult[sendStatus=SEND_OK,msgId=240E03A24CD1B7A0B066027402ACC71F000018B4AAC217E3F1580000,offsetMsgId=C0A8030900002A9F0000000000000000,messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a,queueId=2],queueOffset=0]
3.3 消费者示例
接着,我们来修改一下RocketMQ自带的Consumer示例程序:
1public class Consumer {
2
3 public static void main(String[] args) throws InterruptedException, MQClientException {
4
5 /*
6 * Instantiate with specified consumer group name.
7 */
8 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
9
10 // NameServer地址
11 consumer.setNamesrvAddr("127.0.0.1:9876");
12
13 /*
14 * Specify where to start in case the specified consumer group is a brand new one.
15 */
16 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
17
18 /*
19 * Subscribe one more more topics to consume.
20 */
21 consumer.subscribe("TopicTest", "*");
22
23 /*
24 * Register callback to execute on arrival of messages fetched from brokers.
25 */
26 consumer.registerMessageListener(new MessageListenerConcurrently() {
27
28 @Override
29 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
30 ConsumeConcurrentlyContext context) {
31 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
32 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
33 }
34 });
35
36 /*
37 * Launch the consumer instance.
38 */
39 consumer.start();
40
41 System.out.printf("Consumer Started.%n");
42 }
43}
接着运行上述程序,可以看到消费到了1条消息,如下所示:
1ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=2, storeSize=225, queueOffset=0, sysFlag=0, bornTimestamp=1580887214424, bornHost=/192.168.3.9:56600, storeTimestamp=1580887214434, storeHost=/192.168.3.9:10911, msgId=C0A8030900002A9F0000000000000000, commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1580887519080, UNIQ_KEY=240E03A24CD1B7A0B066027402ACC71F000018B4AAC217E3F1580000, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]]
到此为止,我们的RocketMQ的源码环境彻底搭建完毕了。
四、总结
本章,我们讲解了如何搭建RocketMQ的源码环境,因为RocketMQ源码本身是用Java编写的,所以整个过程还是比较简单的。从下一章开始,我们将基于搭建好的环境,开始分析RocketMQ的源码。




