暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

RocketMQ相关问题

movingbrick 2021-04-15
486

1.RocketMQ Broker中的消息被消费后会立即删除吗?

不会,每条消息都会持久化到CommitLog中,每个Consumer链接到Broker后悔维持消费进度信息,当有消费后只是当前Consumer的消费进度(CommitLog中的offset)更新了

2.消息会堆积吗?什么时候清了过期消息

    1. 扫描间隔

      1. 默认10秒,由broker配置参数cleanResourceInterval决定    

    1. 空间阈值

      1. 物理文件不能无限制的一直存储在磁盘,当磁盘空间达到阈值时,不再接受消息,broker打印出日志,消息发送失败,阈值为固定值85%    

    1. 清理时机

      1. 默认每天凌晨4点,由broker配置参数deleteWhen决定;或者磁盘空间达到阈值    

    1. 文件保留时长

      1. 4.6版本默认48小时后删除不再使用的CommitLog文件,检查这个文件最后访问时间,判断是否大于过期时间,由broker配置参数fileReservedTime决定    

2.1.堆积时间过长消息超时了?

RocketMQ中的消息只会在commitLog被删除的时候才会消失,不会超时。也就是说未被消费的消息不会存在超时删除这情况。

2.2.堆积的消息会不会进死信队列?

不会,消息在消费失败后会进入重试队列(%RETRY%+ConsumerGroup),18次才会进入死信队列

public class MessageStoreConfig {
// 每隔如下时间会进行重试,到最后一次时间重试失败的话就进入死信队列了。
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
}

3.RocketMQ消费模式有几种

消费模式由Consumer决定,消费维度是Topic

3.1.集群消费

    1. 一条消息只会被同一个Group中的一个Consumer消费

    2. 多个Group同时消费一个Topic时,每个Group下都会有一个

    3. 使用集群消费的时候,Consumer的消费进度是存储在broker上的,Consumer自身不存储消费进度。消息进度存储在broker上的好处在于:当Consumer集群扩大或缩小时,由于消费进度统一在broker上,消息重复的概率会被大大降低。

3.2.广播消费

消息会被所有Consumer都消费一次,即使这些Consumer属于同一个Consumer group,所以如果是在广播消费中,ConsumerGroup概念是无意义的

3.3.使用集群消费模拟广播消费

如果业务中确实要广播消费,那么可以创建多个Consumer实例,每个Consumer属于不同的ConsumerGroup,但他们都定于同一个topic就行。除此之外,每隔Consumer的消费逻辑可以不同,每个ConsumerGroup还可以根据需要增加Consumer实例,比起广播消费更灵活。

4.RocketMQ选择了拉模式消费消息

4.1.pull:拉取型消费者,主动从服务器拉取消息

    1. 优点:消费速度,数量可控

    2. 缺点:如果时间间隔较短,可能会拉空,并且频繁RPC请求会增加网络开销,如果时间间隔较长,则可能会有消息延迟

4.2.push:推送型消费者

    1. 封装了消息的拉取,消费进度和其他工作,将消息到达执时执行的回调,接口留给用户程序实现,RocketMQ没有真正意义的push,都是pull,虽然有push类,但实际底层实现采用的是长轮询机制。

    2. 优点:消息实时,保持长链接,不会频繁建立连接

    3. 缺点:消息数量过大,消费者吞吐量小,造成Consumer堆积消息,处理缓慢。

4.3.RocketMQ中的长轮询

    1. 后台会有一个RebalanceService线程,这个线程会根据topic的队列数量和当前消费组的消费者个数做负载均衡,每个队列产生的pullRequest放入阻塞队列pullRequestQueue中。然后又有个PullMessageService线程不断从pullRequestQueue队列中获取pullRequest,然后通过网络请求broker,这样实现的准实时拉取消息。

    2. consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。


5.RocketMQ如何做负载均衡

    1. 通过Topic在多Broker中分布式存储实现。NameServer保存着topic的路有信息,记录了broker集群节点的通讯地址,broker的名称以及读写队列的数量等信息。

    2. 写队列writeQueue标识生产者可以写入的队列数,默认是4,queueId为0,1,2,3。broker接收到消息后根据queueId生成消息队列,生产者负载均衡的过程实质上就是选择broker集群和queueId的过程。

    3. 读队列readQueue标识broker中可以供消费者读取消息的队列,默认是4个,queueId也是0,1,2,3。消费者拿到路由信息后选择queueId,从对应的broker中读取数据消费。

    4. rocketMQ的负载均衡都是在client中完成的,主要分为producer发送消息的负载均衡和consumer消费消息的负载均衡。

5.1.生产者的负载均衡

实质是在选择MessageQueue对象(包含了brokerName和queueId)

    1. 第一种是默认策略,从MessageQueue中随机选择一个,通过自增随机数对列表取余得到位置信息,但获得的MessageQueue所在集群不能是上次选取的集群和上次失败的集群。

    2. 第二种是超时容忍策略,先随机选择一个MessageQueue,如果因为超时等异常发送失败,会优先选择该broker下的其他MessageQueue发送,如果没有找到就从之前发送失败的Broker集群中选择一个进行发送,若还没找到,则使用默认策略

    3. 这也是RocketMQ 是能保证数据的高容错性的原因

5.2.消费者负载均衡

6种算法可以选择

    1. 平均分配算法

      1. Consumer和queue会优先平均分配,

      2. 如果Consumer少于queue的个数,则会存在部分Consumer消费多个queue的情况,

      3. 如果Consumer等于queue的个数,那就是一个Consumer消费一个queue,

      4. 如果Consumer个数大于queue的个数,那么会有部分Consumer空余出来,白白的浪费了。

    1. 环形算法

      1. 如果Consumer个数大于queue的个数,那么会有部分Consumer空余出来,白白的浪费了。

    1. 指定机房算法

    2. 就近机房算法

    3. 统一哈希算法

    4. 手动配置算法

6.如何保证消息不丢失

    1. 消息从生产到消费会经历三个阶段,生产->存储->消费

    2. 生产:producer生产消息,发送给broker

    3. 存储:存储在broker端的磁盘中

    4. 消费:consumer从broker拉取消息进行消费

6.1.producer生产阶段:

    1. 生产者通过网络发送消息给 Broker,当 Broker 收到之后,将会返回确认响应信息给 Producer。所以生产者只要接收到返回的确认响应,就代表消息在生产阶段未丢失。

    2. send方法可以同步发送,只要这个方法不抛出任何异常,就代表消息已经发送成功。

    3. 消息发送成功代表到了broker端,broker配置不同会响应不同状态

    4. RocketMQ 还提供异步的发送的方式,适合于耗时较长的操作,对响应时间较为敏感的业务场景。

    5. 异步发送消息一定要注意重写回调方法,在回调方法中检查发送结果。

    6. 不管是同步还是异步的方式,都会碰到网络问题导致发送失败的情况。针对这种情况,我们可以设置合理的重试次数,当出现网络问题,可以自动重试。设置方式如代码3:

//代码1
//SEND_OK并不意味着它是可靠的。要确保不会丢失任何消息,还应启用SYNC_MASTER或SYNC_FLUSH。
SendStatus.SEND_OK
//如果Broker设置MessageStoreConfig的FlushDiskType = SYNC_FLUSH(默认为ASYNC_FLUSH),
//并且Broker没有在MessageStoreConfig的syncFlushTimeout(默认为5秒)内完成刷新磁盘,返回此状态。
SendStatus.FLUSH_DISK_TIMEOUT
//如果Broker的角色是SYNC_MASTER(默认为ASYNC_MASTER),
//并且从属Broker未在MessageStoreConfig的syncFlushTimeout(默认为5秒)内完成与主服务器的同步,返回此状态。
SendStatus.FLUSH_SLAVE_TIMEOUT
//如果Broker的角色是SYNC_MASTER(默认为ASYNC_MASTER),但没有配置slave Broker,返回此状态。
SendStatus.SLAVE_NOT_AVAILABLE


//代码2
public void init() throws MQClientException {
defaultMQProducer = new DefaultMQProducer(producerGroup);
defaultMQProducer.setNamesrvAddr(namesrvAddr);
defaultMQProducer.setInstanceName(String.valueOf(System.currentTimeMillis()));
defaultMQProducer.start();
log.info("[ MessageProducer ] start success! producerGroup:{}, namesrvAddr:{} ~~~",
producerGroup, namesrvAddr);
}
/**
* 生产消息
* @param msg
* @return
*/
public SendResult send(Message msg) {
SendResult sendResult = null;
try {
String topic = msg.getTopic();
String tags = msg.getTags();
String body = new String(msg.getBody());
sendResult = defaultMQProducer.send(msg);
log.info("[ MessageProducer ] sendStatus:{}, topic:{}, tags:{}, body:{}, time:{}",
sendResult.getSendStatus(), topic, tags, body, DateTimeUtil.standardFormat(new Date()));
} catch (MQClientException e) {
log.error("[ MQClientException ]", e);
} catch (RemotingException e) {
log.error("[ RemotingException ]", e);
} catch (MQBrokerException e) {
log.error("[ MQBrokerException ]", e);
} catch (InterruptedException e) {
log.error("[ InterruptedException ]", e);
}
return sendResult;
}


//代码3
//设置同步发送失败重试次数
defaultMQProducer.setRetryTimesWhenSendFailed(2);
//设置异步发送失败重试次数
defaultMQProducer.setRetryTimesWhenSendAsyncFailed(2);

6.2.Broker存储阶段

6.2.1.异步刷盘与同步刷盘

    1. 默认情况下,消息只要到了 Broker 端,将会优先保存到内存中,然后立刻返回确认响应给生产者。随后 Broker 定期批量的将一组消息从内存异步刷入磁盘。这种方式减少 I/O 次数,可以取得更好的性能,但是如果发生机器掉电,异常宕机等情况,消息还未及时刷入磁盘,就会出现丢失消息的情况。若想保证 Broker 端不丢消息,保证消息的可靠性,我们需要将消息保存机制修改为同步刷盘方式,即消息存储磁盘成功,才会返回响应。

    2. 默认情况异步刷盘flushDiskType = ASYNC_FLUSH 同步刷盘flushDiskType = SYNC_FLUSH

    3. 若 Broker 未在同步刷盘时间内(默认为 5s)完成刷盘,将返回 SendStatus.FLUSH_DISK_TIMEOUT 状态给生产者

6.2.2.集群部署

    1. 为了保证可用性,Broker 通常采用一主(master)多从(slave)部署方式。为了保证消息不丢失,消息还需要复制到 slave 节点。默认方式下,消息写入 master成功,就可以返回确认响应给生产者,接着消息将会异步复制到 slave 节点。

    2. 此时master 配置:flushDiskType = SYNC_FLUSH 要是同步刷盘方式

    3. 此时若 master 突然宕机且不可恢复,那么还未复制到 slave的消息将会丢失。

    4. 为了进一步提高消息的可靠性,我们可以采用同步的复制方式,master节点将会同步等待 slave 节点复制完成,才会返回确认响应。

      1. Broker master 节点 同步复制配置如下:ASYNC_MASTER brokerRole = SYNC_MASTER

      2. 如果 slave节点未在指定时间内同步返回响应,生产者将会收到SendStatus.FLUSH_SLAVE_TIMEOUT 返回状态。

6.3.consumer消费阶段

    1. 消费者从 broker 拉取消息,然后执行相应的业务逻辑。一旦执行成功,将会返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 状态给 Broker。

    2. 如果 Broker 未收到消费确认响应或收到其他状态,消费者下次还会再次拉取到该条消息,进行重试,有重试策略和次数(默认18次)。这样的方式有效避免了消费者消费过程发生异常,或者消息在网络传输中丢失的情况。

    3. 注意:消费端的消息重试机制一定要在集群消费模式下才有效,广播消费模式下是不会进行重试的,只会消费一次不管成不成功。

7.处理重复消息

7.1.原因

    1. producer没收到broker的ack,导致重发

    2. consumer消费成功但是ack失败,导致重复消费

7.2.解决方法

    1. 幂等性,不管逻辑代码执行多少次,每次结果都一样

    2. 给每条消息都分配一个唯一id,保证处理成功和去重表的日志同时出现,如果处理成功则不需要再处理了

    3. 给每条消息增加一个状态标识,标识未处理,正在处理,已处理。

8.如何保证顺序消费

8.1.现状

同一个topic下存在多个队列,producer会在默认发送消息时会轮询选择一个队列进行发送,会导致消息分散到多个队列,多个queue同时消费是无法绝对保证消息的有序性的。broker上的消息只有在同一个队列中消息才是顺序读取的。consumer消费消息时,每一个consumer单独消费broker上的一个队列,一般情况下一个consumer一个进程,不同进程不能保证顺序,并且实际上可能一个consumer有多个线程。

8.2.解决思路

8.2.1.producer端

同一topic只有一个QUEUE,会降低性能,发消息的时候一个线程去发送消息。

8.2.2.consumer端

消费的时候 一个线程顺序消费一个queue里的消息。

但是问题可以转换为对同一个共享资源串行访问或者处理

一般就是加锁处理,broker上的队列消息在被consumer消费时加锁,单个consumer端多线程并发处理消息时加锁,并且还要注意锁异常的情况。

8.2.3.源码分析放单独一节

9.如何保证消息发送到同一个queue

实现MessageQueueSelector接口,自定义算法

DefaultMQProducer defaultMQProducer = new DefaultMQProducer("");
//发送的消息
Message message = new Message();
//自定义的参数
String arg = "2";
defaultMQProducer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
//计算一个queue,这里就是向第一个queue里写消息
if (Integer.parseInt(o.toString()) % 2 == 0){
return list.get(0);
}else {
return list.get(1);
}
}
}, arg);


上一篇: RocketMQ介绍

下一篇: JVM运行时内存区域







文章转载自movingbrick,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论