RocketMQ的使用

2019-02-27   203 次阅读


添加RocketMQ依赖

Maven导入如下依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.3.0</version>
</dependency>

Gradle导入如下依赖

compile 'org.apache.rocketmq:rocketmq-client:4.3.0'

同步发送消息

/**
 * 同步推送消息
 * @author Breeze
 * @version V1.0
 * @date 2019/1/31 18:32
 */
public class SyncProducer {
    public static void main(String[] args) throws Exception {
        // 使用生产者组名进行实例化
        DefaultMQProducer producer = new
                DefaultMQProducer("dbnewyouth-sync");
        // 配置RocketMQ的地址
        producer.setNamesrvAddr("192.168.0.180:9876");
        // 开启生产者
        producer.start();
        for (int i = 1; i <= 100; i++) {
            /*
             * 创建一个实例
             * topic:指定一个主题 例如.TopicTest
             * tags:指定一个标签 例如.TagA
             * msg-body:消息体 例如.("你好 RocketMQ,这是第 " + i + "条消息").getBytes(RemotingHelper.DEFAULT_CHARSET)
             * RemotingHelper.DEFAULT_CHARSET:指定编码为UTF-8
             */
            Message msg = new Message("TopicTest","TagA",("你好 RocketMQ,这是第 " + i + "条消息").getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 推送消息
            SendResult sendResult = producer.send(msg);
            System.out.println("第" + i +"条推送状态:" + sendResult.getSendStatus());
        }
        // 关闭生产者
        producer.shutdown();
    }
}

异步消息推送

/**
 * 异步消息推送
 * @author Breeze
 * @version V1.0
 * @date 2019/1/31 19:00
 */
public class AsyncProducer {

    public static void main(String[] args) throws Exception {
        // 使用生产者组名进行实例化
        DefaultMQProducer producer = new DefaultMQProducer("dbnewyouth-async");
        // 配置RocketMQ服务地址
        producer.setNamesrvAddr("192.168.0.180:9876");
        // 开始生产者
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);
        int messageCount = 100;
        final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
        for (int i = 1; i <= messageCount; i++) {
            final int index = i;
             /*
             * 创建一个实例
             * topic:指定一个主题 例如.TopicTest
             * tags:指定一个标签 例如.TagA
             * msg-body:消息体 例如.("你好 RocketMQ,这是第 " + i + "条消息").getBytes(RemotingHelper.DEFAULT_CHARSET)
             * RemotingHelper.DEFAULT_CHARSET:指定编码为UTF-8
             */
            Message msg = new Message("TopicTestASync","TagA","OrderID188",("你好 RocketMQ,这是第 " + i + "条消息").getBytes(RemotingHelper.DEFAULT_CHARSET));
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("推送状态:" + sendResult.getSendStatus());
                }

                @Override
                public void onException(Throwable e) {
                    e.printStackTrace();
                }
            });
        }
        // 关闭生产者
        countDownLatch.await(5, TimeUnit.SECONDS);
        producer.shutdown();
    }

}

消费者

/**
 * 消费者
 * @author Breeze
 * @version V1.0
 * @date 2019/1/31 18:46
 */
public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        // 使用生产者组名进行实例化
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

        // 配置RocketMQ地址
        consumer.setNamesrvAddr("192.168.0.180:9876");

        // 订阅一个需要消费的主题
        consumer.subscribe("TopicTest", "*");
        // 注册一个回调,获取到消息后执行
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    byte[] msgBody = msg.getBody();
                    System.out.println("执行消费:" + new String(msgBody));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 开启消费者
        consumer.start();

        System.out.println("消费者已启动");
    }

}

Q.E.D.

知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议

让人非我弱,得志莫离群