暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

RocketMQ 源码系列 [producer解析] RocketMQ 源码系列 [producer解析]

小源学源码 2021-11-14
1170

producer在RocketMQ 扮演的角色是消息的发送过程,其实宏观上来讲其实就包括两大块,分别是[消息的发送]以及[一堆定时任务]。producer在发送消息过程中涉及到发送队列的选择(topic、broker、queue),所以只要在整个发送过程中理清楚这几者之间的关系,就能理解整个发送过程。

大致的发送过程的时序图如下:

1、producer是如何创建出来的
/*
 * Instantiate with a producer group name.就是创建一个DefaultMQProducer对象实例,在其中传入你所属的Producer分组,然后设置一下NameServer的地址,最后调用他的start()方法,启动这个Producer就可以了。
 */

DefaultMQProducer producer =
             new DefaultMQProducer("please_rename_unique_group_name");

producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");

producer.start();

1.1、start()初始化方法具体做了什么事情
/**
 * Start this producer instance. </p>
 *
 * <strong> Much internal initializing procedures are carried out to make this instance prepared, thus, it's a must
 * to invoke this method before sending or querying messages. </strong> </p>
 *
 * @throws MQClientException if there is any unexpected error.
 */

@Override
public void start() throws MQClientException {
    //处理生产者组
    this.setProducerGroup(withNamespace(this.producerGroup));
    //调用实现的方法
    this.defaultMQProducerImpl.start();
    //消息的链路跟踪
    if (null != traceDispatcher) {
        try {
            traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
        } catch (MQClientException e) {
            log.warn("trace dispatcher start failed ", e);
        }
    }
}

this.defaultMQProducerImpl.start()
public void start(final boolean startFactory) throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;
//检查配置
this.checkConfig();
//本地注册producer客户端
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook); 
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);

//如果没有注册成功抛出异常
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException
}

//保存空的TopicPublishInfo信息this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

//启动producer
if (startFactory) {
  mQClientFactory.start();
}
//打印日志
log.info()
break;
}
 //给所有的broker发送心跳
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

    this.timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            try {
                //扫描过期的请求
                RequestFutureTable.scanExpiredRequest();
            } catch (Throwable e) {
                log.error("scan RequestFutureTable exception", e);
            }
        }
    }, 1000 * 31000);
}

mQClientFactory.start();
//启动producer
public void start() throws MQClientException {
    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                // Start request-response channel(和broker建立定时连接)
                this.mQClientAPIImpl.start();
                // Start various schedule tasks(启动定时任务)
                this.startScheduledTask();
                // Start pull service(启动消息拉取任务)
                this.pullMessageService.start();
                // Start rebalance service(启动负载均衡任务)
                this.rebalanceService.start();
                // Start push service(启动消息推送任务)
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
        }
    }
}

可以参考下面的时序图来理解这部分源码

2、访问namesrv获取topicpublicinfo、选择MessageQueue发送消息

TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

这行代码发送消息的时候,他都会先去检查一下,这个你要发送消息的Topic的路由数据是否在你客户端本地如果不在的话,必然会发送请求到NameServer那里去拉取一下的,然后缓存在客户端本地。


MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

这行代码其实就是在选择Topic中的一个MessageQueue,然后发送消息到这个MessageQueue去,在这行代码里面,实现了一些Broker故障自动回避机制,他核心的就是用这个index对Topic的MessageQueue列表进行了取模操作,获取到了一个MessageQueue列表的位置,然后返回了这个位置的MessageQueue



3、调用sendKernelImpl实际发送消息
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());


是通过brokerName去本地缓存找他的实际的地址,如果找不到,就去找NameServer拉取Topic的路由数据


然后是封装消息来发送Broker,包括了给消息分配全局唯一ID、对超过4KB的消息体进行压缩,在消息Request中包含了生产者组、Topic名称、Topic的MessageQueue数量、MessageQueue的ID、消息发送时间、消息的flag、消息扩展属性、消息重试次数、是否是批量发送的消息、如果是事务消息则带上prepared标记


private SendResult sendKernelImpl(final Message msg,
    final MessageQueue mq,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final TopicPublishInfo topicPublishInfo,
    final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    long beginStartTime = System.currentTimeMillis();
    //是通过brokerName去本地缓存找他的实际的地址,如果找不到,就去找NameServer拉取Topic的路由数据
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    if (null == brokerAddr) {
        tryToFindTopicPublishInfo(mq.getTopic());
        brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    }
    //要发送的信息
    SendMessageContext context = null;
    if (brokerAddr != null) {
        brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
        //使用字节数组保存消息的body
        byte[] prevBody = msg.getBody();
        try {
            //for MessageBatch,ID has been set in the generating process
            if (!(msg instanceof MessageBatch)) {
                MessageClientIDSetter.setUniqID(msg);
            }

            boolean topicWithNamespace = false;
            if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
                msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
                topicWithNamespace = true;
            }

            int sysFlag = 0;
            boolean msgBodyCompressed = false;

            //对超过4k的消息体进行压缩
            if (this.tryToCompressMessage(msg)) {
                sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                msgBodyCompressed = true;
            }

            final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
            if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
            }

            if (hasCheckForbiddenHook()) {
                CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                checkForbiddenContext.setCommunicationMode(communicationMode);
                checkForbiddenContext.setBrokerAddr(brokerAddr);
                checkForbiddenContext.setMessage(msg);
                checkForbiddenContext.setMq(mq);
                checkForbiddenContext.setUnitMode(this.isUnitMode());
                this.executeCheckForbiddenHook(checkForbiddenContext);
            }

            if (this.hasSendMessageHook()) {
                context = new SendMessageContext();
                context.setProducer(this);
                context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                context.setCommunicationMode(communicationMode);
                context.setBornHost(this.defaultMQProducer.getClientIP());
                context.setBrokerAddr(brokerAddr);
                context.setMessage(msg);
                context.setMq(mq);
                context.setNamespace(this.defaultMQProducer.getNamespace());
                String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (isTrans != null && isTrans.equals("true")) {
                    context.setMsgType(MessageType.Trans_Msg_Half);
                }

                if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                    context.setMsgType(MessageType.Delay_Msg);
                }
                this.executeSendMessageHookBefore(context);
            }

            SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
            requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
            requestHeader.setTopic(msg.getTopic());
            requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
            requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setSysFlag(sysFlag);
            requestHeader.setBornTimestamp(System.currentTimeMillis());
            requestHeader.setFlag(msg.getFlag());
            requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
            requestHeader.setReconsumeTimes(0);
            requestHeader.setUnitMode(this.isUnitMode());
            requestHeader.setBatch(msg instanceof MessageBatch);
            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                if (reconsumeTimes != null) {
                    requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                }

                String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                if (maxReconsumeTimes != null) {
                    requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                }
            }

            SendResult sendResult = null;
            switch (communicationMode) {
                case ASYNC:
                    Message tmpMessage = msg;
                    boolean messageCloned = false;
                    if (msgBodyCompressed) {
                        //If msg body was compressed, msgbody should be reset using prevBody.
                        //Clone new message using commpressed message body and recover origin massage.
                        //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                        tmpMessage = MessageAccessor.cloneMessage(msg);
                        messageCloned = true;
                        msg.setBody(prevBody);
                    }

                    if (topicWithNamespace) {
                        if (!messageCloned) {
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            messageCloned = true;
                        }
                        msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
                    }

                    long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeAsync) {
                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    //去封装了一个Request请求出
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        mq.getBrokerName(),
                        tmpMessage,
                        requestHeader,
                        timeout - costTimeAsync,
                        communicationMode,
                        sendCallback,
                        topicPublishInfo,
                        this.mQClientFactory,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                        context,
                        this);
                    break;
         
            }

       

            return sendResult;
        } 
    }

    throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}


文章转载自小源学源码,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论