1.RocketMQ Broker中的消息被消费后会立即删除吗?
不会,每条消息都会持久化到CommitLog中,每个Consumer链接到Broker后悔维持消费进度信息,当有消费后只是当前Consumer的消费进度(CommitLog中的offset)更新了
2.消息会堆积吗?什么时候清了过期消息
扫描间隔
默认10秒,由broker配置参数cleanResourceInterval决定
空间阈值
物理文件不能无限制的一直存储在磁盘,当磁盘空间达到阈值时,不再接受消息,broker打印出日志,消息发送失败,阈值为固定值85%
清理时机
默认每天凌晨4点,由broker配置参数deleteWhen决定;或者磁盘空间达到阈值
文件保留时长
4.6版本默认48小时后删除不再使用的CommitLog文件,检查这个文件最后访问时间,判断是否大于过期时间,由broker配置参数fileReservedTime决定
2.1.堆积时间过长消息超时了?
RocketMQ中的消息只会在commitLog被删除的时候才会消失,不会超时。也就是说未被消费的消息不会存在超时删除这情况。
2.2.堆积的消息会不会进死信队列?
不会,消息在消费失败后会进入重试队列(%RETRY%+ConsumerGroup),18次才会进入死信队列
public class MessageStoreConfig {// 每隔如下时间会进行重试,到最后一次时间重试失败的话就进入死信队列了。private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";}
3.RocketMQ消费模式有几种
消费模式由Consumer决定,消费维度是Topic
3.1.集群消费
一条消息只会被同一个Group中的一个Consumer消费
多个Group同时消费一个Topic时,每个Group下都会有一个
使用集群消费的时候,Consumer的消费进度是存储在broker上的,Consumer自身不存储消费进度。消息进度存储在broker上的好处在于:当Consumer集群扩大或缩小时,由于消费进度统一在broker上,消息重复的概率会被大大降低。
3.2.广播消费
消息会被所有Consumer都消费一次,即使这些Consumer属于同一个Consumer group,所以如果是在广播消费中,ConsumerGroup概念是无意义的
3.3.使用集群消费模拟广播消费
如果业务中确实要广播消费,那么可以创建多个Consumer实例,每个Consumer属于不同的ConsumerGroup,但他们都定于同一个topic就行。除此之外,每隔Consumer的消费逻辑可以不同,每个ConsumerGroup还可以根据需要增加Consumer实例,比起广播消费更灵活。
4.RocketMQ选择了拉模式消费消息
4.1.pull:拉取型消费者,主动从服务器拉取消息
优点:消费速度,数量可控
缺点:如果时间间隔较短,可能会拉空,并且频繁RPC请求会增加网络开销,如果时间间隔较长,则可能会有消息延迟
4.2.push:推送型消费者
封装了消息的拉取,消费进度和其他工作,将消息到达执时执行的回调,接口留给用户程序实现,RocketMQ没有真正意义的push,都是pull,虽然有push类,但实际底层实现采用的是长轮询机制。
优点:消息实时,保持长链接,不会频繁建立连接
缺点:消息数量过大,消费者吞吐量小,造成Consumer堆积消息,处理缓慢。
4.3.RocketMQ中的长轮询
后台会有一个RebalanceService线程,这个线程会根据topic的队列数量和当前消费组的消费者个数做负载均衡,每个队列产生的pullRequest放入阻塞队列pullRequestQueue中。然后又有个PullMessageService线程不断从pullRequestQueue队列中获取pullRequest,然后通过网络请求broker,这样实现的准实时拉取消息。
consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。

5.RocketMQ如何做负载均衡
通过Topic在多Broker中分布式存储实现。NameServer保存着topic的路有信息,记录了broker集群节点的通讯地址,broker的名称以及读写队列的数量等信息。
写队列writeQueue标识生产者可以写入的队列数,默认是4,queueId为0,1,2,3。broker接收到消息后根据queueId生成消息队列,生产者负载均衡的过程实质上就是选择broker集群和queueId的过程。
读队列readQueue标识broker中可以供消费者读取消息的队列,默认是4个,queueId也是0,1,2,3。消费者拿到路由信息后选择queueId,从对应的broker中读取数据消费。
rocketMQ的负载均衡都是在client中完成的,主要分为producer发送消息的负载均衡和consumer消费消息的负载均衡。
5.1.生产者的负载均衡
实质是在选择MessageQueue对象(包含了brokerName和queueId)
第一种是默认策略,从MessageQueue中随机选择一个,通过自增随机数对列表取余得到位置信息,但获得的MessageQueue所在集群不能是上次选取的集群和上次失败的集群。
第二种是超时容忍策略,先随机选择一个MessageQueue,如果因为超时等异常发送失败,会优先选择该broker下的其他MessageQueue发送,如果没有找到就从之前发送失败的Broker集群中选择一个进行发送,若还没找到,则使用默认策略
这也是RocketMQ 是能保证数据的高容错性的原因
5.2.消费者负载均衡
6种算法可以选择
平均分配算法
Consumer和queue会优先平均分配,
如果Consumer少于queue的个数,则会存在部分Consumer消费多个queue的情况,
如果Consumer等于queue的个数,那就是一个Consumer消费一个queue,
如果Consumer个数大于queue的个数,那么会有部分Consumer空余出来,白白的浪费了。
环形算法
如果Consumer个数大于queue的个数,那么会有部分Consumer空余出来,白白的浪费了。
指定机房算法
就近机房算法
统一哈希算法
手动配置算法
6.如何保证消息不丢失
消息从生产到消费会经历三个阶段,生产->存储->消费
生产:producer生产消息,发送给broker
存储:存储在broker端的磁盘中
消费:consumer从broker拉取消息进行消费
6.1.producer生产阶段:
生产者通过网络发送消息给 Broker,当 Broker 收到之后,将会返回确认响应信息给 Producer。所以生产者只要接收到返回的确认响应,就代表消息在生产阶段未丢失。
send方法可以同步发送,只要这个方法不抛出任何异常,就代表消息已经发送成功。
消息发送成功代表到了broker端,broker配置不同会响应不同状态
RocketMQ 还提供异步的发送的方式,适合于耗时较长的操作,对响应时间较为敏感的业务场景。
异步发送消息一定要注意重写回调方法,在回调方法中检查发送结果。
不管是同步还是异步的方式,都会碰到网络问题导致发送失败的情况。针对这种情况,我们可以设置合理的重试次数,当出现网络问题,可以自动重试。设置方式如代码3:
//代码1//SEND_OK并不意味着它是可靠的。要确保不会丢失任何消息,还应启用SYNC_MASTER或SYNC_FLUSH。SendStatus.SEND_OK//如果Broker设置MessageStoreConfig的FlushDiskType = SYNC_FLUSH(默认为ASYNC_FLUSH),//并且Broker没有在MessageStoreConfig的syncFlushTimeout(默认为5秒)内完成刷新磁盘,返回此状态。SendStatus.FLUSH_DISK_TIMEOUT//如果Broker的角色是SYNC_MASTER(默认为ASYNC_MASTER),//并且从属Broker未在MessageStoreConfig的syncFlushTimeout(默认为5秒)内完成与主服务器的同步,返回此状态。SendStatus.FLUSH_SLAVE_TIMEOUT//如果Broker的角色是SYNC_MASTER(默认为ASYNC_MASTER),但没有配置slave Broker,返回此状态。SendStatus.SLAVE_NOT_AVAILABLE//代码2public void init() throws MQClientException {defaultMQProducer = new DefaultMQProducer(producerGroup);defaultMQProducer.setNamesrvAddr(namesrvAddr);defaultMQProducer.setInstanceName(String.valueOf(System.currentTimeMillis()));defaultMQProducer.start();log.info("[ MessageProducer ] start success! producerGroup:{}, namesrvAddr:{} ~~~",producerGroup, namesrvAddr);}/*** 生产消息* @param msg* @return*/public SendResult send(Message msg) {SendResult sendResult = null;try {String topic = msg.getTopic();String tags = msg.getTags();String body = new String(msg.getBody());sendResult = defaultMQProducer.send(msg);log.info("[ MessageProducer ] sendStatus:{}, topic:{}, tags:{}, body:{}, time:{}",sendResult.getSendStatus(), topic, tags, body, DateTimeUtil.standardFormat(new Date()));} catch (MQClientException e) {log.error("[ MQClientException ]", e);} catch (RemotingException e) {log.error("[ RemotingException ]", e);} catch (MQBrokerException e) {log.error("[ MQBrokerException ]", e);} catch (InterruptedException e) {log.error("[ InterruptedException ]", e);}return sendResult;}//代码3//设置同步发送失败重试次数defaultMQProducer.setRetryTimesWhenSendFailed(2);//设置异步发送失败重试次数defaultMQProducer.setRetryTimesWhenSendAsyncFailed(2);
6.2.Broker存储阶段
6.2.1.异步刷盘与同步刷盘
默认情况下,消息只要到了 Broker 端,将会优先保存到内存中,然后立刻返回确认响应给生产者。随后 Broker 定期批量的将一组消息从内存异步刷入磁盘。这种方式减少 I/O 次数,可以取得更好的性能,但是如果发生机器掉电,异常宕机等情况,消息还未及时刷入磁盘,就会出现丢失消息的情况。若想保证 Broker 端不丢消息,保证消息的可靠性,我们需要将消息保存机制修改为同步刷盘方式,即消息存储磁盘成功,才会返回响应。
默认情况异步刷盘flushDiskType = ASYNC_FLUSH 同步刷盘flushDiskType = SYNC_FLUSH
若 Broker 未在同步刷盘时间内(默认为 5s)完成刷盘,将返回 SendStatus.FLUSH_DISK_TIMEOUT 状态给生产者
6.2.2.集群部署
为了保证可用性,Broker 通常采用一主(master)多从(slave)部署方式。为了保证消息不丢失,消息还需要复制到 slave 节点。默认方式下,消息写入 master成功,就可以返回确认响应给生产者,接着消息将会异步复制到 slave 节点。
此时master 配置:flushDiskType = SYNC_FLUSH 要是同步刷盘方式
此时若 master 突然宕机且不可恢复,那么还未复制到 slave的消息将会丢失。
为了进一步提高消息的可靠性,我们可以采用同步的复制方式,master节点将会同步等待 slave 节点复制完成,才会返回确认响应。
Broker master 节点 同步复制配置如下:ASYNC_MASTER brokerRole = SYNC_MASTER
如果 slave节点未在指定时间内同步返回响应,生产者将会收到SendStatus.FLUSH_SLAVE_TIMEOUT 返回状态。
6.3.consumer消费阶段
消费者从 broker 拉取消息,然后执行相应的业务逻辑。一旦执行成功,将会返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 状态给 Broker。
如果 Broker 未收到消费确认响应或收到其他状态,消费者下次还会再次拉取到该条消息,进行重试,有重试策略和次数(默认18次)。这样的方式有效避免了消费者消费过程发生异常,或者消息在网络传输中丢失的情况。
注意:消费端的消息重试机制一定要在集群消费模式下才有效,广播消费模式下是不会进行重试的,只会消费一次不管成不成功。
7.处理重复消息
7.1.原因
producer没收到broker的ack,导致重发
consumer消费成功但是ack失败,导致重复消费
7.2.解决方法
幂等性,不管逻辑代码执行多少次,每次结果都一样
给每条消息都分配一个唯一id,保证处理成功和去重表的日志同时出现,如果处理成功则不需要再处理了
给每条消息增加一个状态标识,标识未处理,正在处理,已处理。
8.如何保证顺序消费
8.1.现状
同一个topic下存在多个队列,producer会在默认发送消息时会轮询选择一个队列进行发送,会导致消息分散到多个队列,多个queue同时消费是无法绝对保证消息的有序性的。broker上的消息只有在同一个队列中消息才是顺序读取的。consumer消费消息时,每一个consumer单独消费broker上的一个队列,一般情况下一个consumer一个进程,不同进程不能保证顺序,并且实际上可能一个consumer有多个线程。
8.2.解决思路
8.2.1.producer端
同一topic只有一个QUEUE,会降低性能,发消息的时候一个线程去发送消息。
8.2.2.consumer端
消费的时候 一个线程去顺序消费一个queue里的消息。
但是问题可以转换为对同一个共享资源串行访问或者处理
一般就是加锁处理,broker上的队列消息在被consumer消费时加锁,单个consumer端多线程并发处理消息时加锁,并且还要注意锁异常的情况。
8.2.3.源码分析放单独一节
9.如何保证消息发送到同一个queue
实现MessageQueueSelector接口,自定义算法
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("");//发送的消息Message message = new Message();//自定义的参数String arg = "2";defaultMQProducer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {//计算一个queue,这里就是向第一个queue里写消息if (Integer.parseInt(o.toString()) % 2 == 0){return list.get(0);}else {return list.get(1);}}}, arg);




