暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

First, easy demo, Simple!

方家小白 2021-11-27
157

首先我们来学习一个简单的消费生产和消费的demo
.

生产消息

RocketMQ
, 给我们提供了三种简单的消息生产方式,1.同步发送
2.异步发送
3.直接发送,不关心发送结果

这三种方式,分别对应三种不同的应用场景

同步发送

这种可靠同步的发送方式使用的比较广泛,比如:重要的消息通知,短信通知。这种发送方式,在发送成功之后,才会返回,否则会一直阻塞,直到抛出异常。

public class SyncProducer {
 public static void main(String[] args) throws Exception {
     // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
     // 设置NameServer的地址
     producer.setNamesrvAddr("192.168.1.65:9876");
     // 启动Producer实例
        producer.start();
     for (int i = 0; i < 100; i++) {
         // 创建消息,并指定Topic,Tag和消息体
         Message msg = new Message("TopicTest" /* Topic */,
         "TagA" /* Tag */,
         ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
         );
         // 发送消息到一个Broker
            SendResult sendResult = producer.send(msg);
            // 通过sendResult返回消息是否成功送达
            System.out.printf("%s%n", sendResult);
     }
     // 如果不再发送消息,关闭Producer实例。
     producer.shutdown();
    }
}

异步发送

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker
的响应。

public class AsyncProducer {
 public static void main(String[] args) throws Exception {
     // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
     // 设置NameServer的地址
        producer.setNamesrvAddr("192.168.1.65:9876");
     // 启动Producer实例
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);
 
 int messageCount = 100;
        // 根据消息数量实例化倒计时计算器
 final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
     for (int i = 0; i < messageCount; i++) {
                final int index = i;
             // 创建消息,并指定Topic,Tag和消息体
                Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // SendCallback接收异步返回结果的回调
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.printf("%-10d OK %s %n", index,
                            sendResult.getMsgId());
                    }
                    @Override
                    public void onException(Throwable e) {
                     System.out.printf("%-10d Exception %s %n", index, e);
                     e.printStackTrace();
                    }
             });
     }
 // 等待5s
 countDownLatch.await(5, TimeUnit.SECONDS);
     // 如果不再发送消息,关闭Producer实例。
     producer.shutdown();
    }
}

单向发送消息

这种方式主要用在不特别关心发送结果的场景,例如日志发送。

public class OnewayProducer {
 public static void main(String[] args) throws Exception{
     // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
     // 设置NameServer的地址
        producer.setNamesrvAddr("192.168.1.65:9876");
     // 启动Producer实例
        producer.start();
     for (int i = 0; i < 100; i++) {
         // 创建消息,并指定Topic,Tag和消息体
         Message msg = new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
         );
         // 发送单向消息,没有任何返回结果
         producer.sendOneway(msg);

     }
     // 如果不再发送消息,关闭Producer实例。
     producer.shutdown();
    }
}

消息消费

Push 模式 消费

public class Consumer {

 public static void main(String[] args) throws InterruptedException, MQClientException {

     // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

     // 设置NameServer的地址
        consumer.setNamesrvAddr("192.168.1.65:9876");

     // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
        consumer.subscribe("TopicTest""*");
     // 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者实例
        consumer.start();
        System.out.printf("Consumer Started.%n");
 }
}

Pull 模式消费 - Assign

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;

public class LitePullConsumerAssign {

    public static volatile boolean running = true;

    public static void main(String[] args) throws Exception {
        // 创建 LitePullConsumer 实例
        DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name");
        // 不自动提交 消息消费的偏移量
        litePullConsumer.setAutoCommit(false);
        // 启动消费者
        litePullConsumer.start();
        // 拉取topicTest的MessageQueue列表
        Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("TopicTest");
        List<MessageQueue> list = new ArrayList<>(mqSet);
        List<MessageQueue> assignList = new ArrayList<>();
        for (int i = 0; i < list.size() / 2; i++) {
            assignList.add(list.get(i));
        }
        // 本消费分配的 MessageQueue 列表。本消费者只会消费assignList中的消息
        litePullConsumer.assign(assignList);
        // 改变下一次拉取时,消息的偏移量。
        litePullConsumer.seek(assignList.get(0), 10);
        try {
            while (running) {
                // 消息拉取 API,默认超时时间为 5s。
                List<MessageExt> messageExts = litePullConsumer.poll();
                System.out.printf("%s %n", messageExts);
                // 提交偏移量
                litePullConsumer.commitSync();
            }
        } finally {
            litePullConsumer.shutdown();
        }
    }
}

Pull 模式消费 - SUBCRIBE

import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

public class LitePullConsumerSubscribe {

    public static volatile boolean running = true;

    public static void main(String[] args) throws Exception {
        DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
        litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        litePullConsumer.subscribe("TopicTest""*");
        litePullConsumer.start();
        try {
            while (running) {
                List<MessageExt> messageExts = litePullConsumer.poll();
                System.out.printf("%s%n", messageExts);
            }
        } finally {
            litePullConsumer.shutdown();
        }
    }
}

最后

期望和你一起遇见更好的自己


文章转载自方家小白,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论