暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

RocketMQ源码分析:RocketMQ源码环境搭建

TPVLOG 2021-06-21
1844

本文首发于Ressmix个人站点:https://www.tpvlog.com

本章是分布式消息中间件(进阶篇)的第一篇,我将开始对RocketMQ的源码进行分析,帮助大家深入理解RockectMQ的整个架构及核心功能的原理。阅读源码,最核心的一点是:理清系统架构、模块划分、模块关系,然后按照开源框架的工作流程从核心组件开始阅读,在阅读源码的过程中一定要“不求甚解”,抛开细枝末节,抓住核心主干。

一、导入源码

我们先来搭建RocketMQ的源码环境。RocketMQ的开源代码托管在GitHub上:https://github.com/apache/rocketmq,我们clone一份到自己的本地机器:

由于项目采用Maven管理,所以我们可以在根目录执行以下命令进行编译打包:

1mvn clean install -Dmaven.test.skip=true

然后,我们将Maven项目通过IDE(比如Idea)导入:

可以看到,RocketMQ源码的模块结构如下:

  • broker:存放RocketMQ的Broker相关的代码,这里的代码可以用来启动Broker进程;

  • client:存放RocketMQ的Producer、Consumer这些客户端的代码,生产消息、消费消息的代码都在里面;

  • common:存放公共代码;

  • dev:存放开发相关的一些信息;

  • distribution:存放用来部署RocketMQ的一些东西,比如bin目录 、conf目录等等;

  • example:存放RocketMQ的一些例子;

  • filter:存放RocketMQ的与过滤器相关的代码;

  • logappender和logging:存放RocketMQ的日志打印相关的东西;

  • namesrv:存放NameServer的源码;

  • openmessaging:这是开放消息标准,先忽略;

  • remoting:存放RocketMQ的远程网络通信模块的代码,基于netty实现;

  • srvutil:存放一些工具类;

  • store:存放Broker上存储相关的一些源码;

  • style、test、tools:存放checkstyle代码检查的东西,一些测试相关的类,还有就是tools里放的一些命令行监控工具类。

二、启动工程

接下来,我们来看下如何启动RocketMQ工程。在使用RocketMQ时,肯定是先启动NameServer,因为它是RocketMQ的消息路由中心;接着,肯定是启动Broker,它负责接收消息和消息的存储;最后才是启动Producer和Consumer。

2.1 启动NameServer

要启动NameServer,所以我们先在RocketMQ源码目录中找到namesrv
这个工程,然后找到NamesrvStartup.java
启动类:

NameServer启动时,会去寻找一个名为ROCKETMQ_HOME
的环境变量,这个环境变量的值是我们本机上的RocketMQ的运行时根目录。所以,我们需要先建立一个ROCKETMQ_HOME
目录,例如,我在自己本机上建立了如下根目录:

注意,我们需要在ROCKETMQ_HOME
根目录下创建conf、logs、store三个文件夹,因为后续NameServer运行是需要使用这些目录的。然后我们把RocketMQ源码目录中的distrbution
模块下的broker.conf、logback_namesrv.xml两个配置文件拷贝到上述的conf目录中去,接着就需要修改这两个配置文件的内容:

logback_namesrv.xml:修改里面的所有

 1brokerClusterName = DefaultCluster
2brokerName = broker-a
3brokerId = 0
4# 这是nameserver的地址
5namesrvAddr=127.0.0.1:9876
6deleteWhen = 04
7fileReservedTime = 48
8brokerRole = ASYNC_MASTER
9flushDiskType = ASYNC_FLUSH
10
11# 这是存储路径,你设置为你的rocketmq运行目录的store子目录
12storePathRootDir=C:\Users\Ressmix\Desktop\ROCKETMQ\store
13
14# 这是commitLog的存储路径
15storePathCommitLog=C:\Users\Ressmix\Desktop\ROCKETMQ\store/commitlog
16
17# consume queue文件的存储路径
18storePathConsumeQueue=C:\Users\Ressmix\Desktop\ROCKETMQ\store/consumequeue
19
20# 消息索引文件的存储路径
21storePathIndex=C:\Users\Ressmix\Desktop\ROCKETMQ\store/index
22
23# checkpoint文件的存储路径
24storeCheckpoint=C:\Users\Ressmix\Desktop\ROCKETMQ\store/checkpoint
25
26# abort文件的存储路径
27abortFile=C:\Users\Ressmix\Desktop\ROCKETMQ/abort


最后,启动NameServer,注意启动时,要先配置好ROCKETMQ_HOME
环境变量,可以在IDE中进行配置:

NameServer启动时会读取conf
里的配置文件,所有的日志都会打印在logs
目录里,然后数据都会写在store
目录里,启动成功之后,在Intellij IDEA的命令行里就会看到下面的提示:

1Connected to the target VM, address: '127.0.0.1:54473', transport: 'socket'
2The Name Server boot success. serializeType=JSON


2.2 启动Broker

启动Broker的方式和NameServer类似,先在RocketMQ源码目录中找到broker
这个工程,然后找到BrokerStartup.java
启动类:

Broker启动时,需要指定broker.conf
配置,这个就是ROCKETMQ_HOME/conf/broker.conf
,主要是配置了NameServer的地址,然后配置了Broker的数据存储路径:包括commitlog文件、consumequeue文件、index文件、checkpoint文件等等。

除此之外,我们还需要将RocketMQ源码目录中的distrbution
模块下的logback_broker.xml
这个配置文件拷贝到上述的conf目录中去,修改里面的所有${user.home}ROCKETMQ_HOME
根目录。

最后,启动Broker,注意要在IDE中进行配置:

启动成功之后,在Intellij IDEA的命令行里就会看到下面的提示:

1Connected to the target VM, address: '127.0.0.1:55275', transport: 'socket'
2The broker[broker-a, 192.168.3.9:10911] boot success. serializeType=JSON

三、 运行示例

NameServer和Broker都启动成功了,接着我们就可以进行消息投递和消费了。RocketMQ源码里的example
模块下提供了生产者和消费者的示例:

3.1 创建Topic

投递/订阅消息肯定要指定消息的Topic,我们先通过RocketMQ的管理控制台创建一个测试用的Topic——TopicTest

我在《RocketMQ性能压测》一章讲过如果搭建RocketMQ的管理控制台,可以参阅该章节内容进行搭建。

3.2 生产者示例

接着,我们来修改一下RocketMQ自带的Producer示例程序:

 1public class Producer {
2    public static void main(String[] args) throws MQClientException, InterruptedException {
3
4        /*
5         * Instantiate with a producer group name.
6         */

7        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
8
9        // 设置NameServer地址
10        producer.setNamesrvAddr("127.0.0.1:9876");
11
12
13        /*
14         * Launch the instance.
15         */

16        producer.start();
17
18        for (int i = 0; i < 1; i++) {
19            try {
20
21                /*
22                 * Create a message instance, specifying topic, tag and message body.
23                 */

24                Message msg = new Message("TopicTest" /* Topic */,
25                    "TagA" /* Tag */,
26                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
27                );
28
29                /*
30                 * Call send message to deliver message to one of brokers.
31                 */

32                SendResult sendResult = producer.send(msg);
33
34                System.out.printf("%s%n", sendResult);
35            } catch (Exception e) {
36                e.printStackTrace();
37                Thread.sleep(1000);
38            }
39        }
40
41        /*
42         * Shut down once the producer instance is not longer in use.
43         */

44        producer.shutdown();
45    }
46}


我们运行上面的程序就可以了,他会发送1条消息到Broker里去,我们观察一下控制台的日志打印,如果可以看到下面的内容,就说明我们已经成功的把消息发送到Broker里去了:

1SendResult[sendStatus=SEND_OK,msgId=240E03A24CD1B7A0B066027402ACC71F000018B4AAC217E3F1580000,offsetMsgId=C0A8030900002A9F0000000000000000,messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a,queueId=2],queueOffset=0]

3.3 消费者示例

接着,我们来修改一下RocketMQ自带的Consumer示例程序:

1public class Consumer {
2
3    public static void main(String[] args) throws InterruptedException, MQClientException {
4
5        /*
6         * Instantiate with specified consumer group name.
7         */

8        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
9
10        // NameServer地址
11        consumer.setNamesrvAddr("127.0.0.1:9876");
12
13        /*
14         * Specify where to start in case the specified consumer group is a brand new one.
15         */

16        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
17
18        /*
19         * Subscribe one more more topics to consume.
20         */

21        consumer.subscribe("TopicTest""*");
22
23        /*
24         *  Register callback to execute on arrival of messages fetched from brokers.
25         */

26        consumer.registerMessageListener(new MessageListenerConcurrently() {
27
28            @Override
29            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
30                ConsumeConcurrentlyContext context)
 
{
31                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
32                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
33            }
34        });
35
36        /*
37         *  Launch the consumer instance.
38         */

39        consumer.start();
40
41        System.out.printf("Consumer Started.%n");
42    }
43}


接着运行上述程序,可以看到消费到了1条消息,如下所示:

1ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=2, storeSize=225, queueOffset=0, sysFlag=0, bornTimestamp=1580887214424, bornHost=/192.168.3.9:56600, storeTimestamp=1580887214434, storeHost=/192.168.3.9:10911, msgId=C0A8030900002A9F0000000000000000, commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1580887519080, UNIQ_KEY=240E03A24CD1B7A0B066027402ACC71F000018B4AAC217E3F1580000, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[7210110810811132821119910710111677813248], transactionId='null'}]]

到此为止,我们的RocketMQ的源码环境彻底搭建完毕了。

四、总结

本章,我们讲解了如何搭建RocketMQ的源码环境,因为RocketMQ源码本身是用Java编写的,所以整个过程还是比较简单的。从下一章开始,我们将基于搭建好的环境,开始分析RocketMQ的源码。


文章转载自TPVLOG,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论