添加RocketMQ依赖
Maven导入如下依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
Gradle导入如下依赖
compile 'org.apache.rocketmq:rocketmq-client:4.3.0'
同步发送消息
/**
* 同步推送消息
* @author Breeze
* @version V1.0
* @date 2019/1/31 18:32
*/
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 使用生产者组名进行实例化
DefaultMQProducer producer = new
DefaultMQProducer("dbnewyouth-sync");
// 配置RocketMQ的地址
producer.setNamesrvAddr("192.168.0.180:9876");
// 开启生产者
producer.start();
for (int i = 1; i <= 100; i++) {
/*
* 创建一个实例
* topic:指定一个主题 例如.TopicTest
* tags:指定一个标签 例如.TagA
* msg-body:消息体 例如.("你好 RocketMQ,这是第 " + i + "条消息").getBytes(RemotingHelper.DEFAULT_CHARSET)
* RemotingHelper.DEFAULT_CHARSET:指定编码为UTF-8
*/
Message msg = new Message("TopicTest","TagA",("你好 RocketMQ,这是第 " + i + "条消息").getBytes(RemotingHelper.DEFAULT_CHARSET));
// 推送消息
SendResult sendResult = producer.send(msg);
System.out.println("第" + i +"条推送状态:" + sendResult.getSendStatus());
}
// 关闭生产者
producer.shutdown();
}
}
异步消息推送
/**
* 异步消息推送
* @author Breeze
* @version V1.0
* @date 2019/1/31 19:00
*/
public class AsyncProducer {
public static void main(String[] args) throws Exception {
// 使用生产者组名进行实例化
DefaultMQProducer producer = new DefaultMQProducer("dbnewyouth-async");
// 配置RocketMQ服务地址
producer.setNamesrvAddr("192.168.0.180:9876");
// 开始生产者
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 100;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 1; i <= messageCount; i++) {
final int index = i;
/*
* 创建一个实例
* topic:指定一个主题 例如.TopicTest
* tags:指定一个标签 例如.TagA
* msg-body:消息体 例如.("你好 RocketMQ,这是第 " + i + "条消息").getBytes(RemotingHelper.DEFAULT_CHARSET)
* RemotingHelper.DEFAULT_CHARSET:指定编码为UTF-8
*/
Message msg = new Message("TopicTestASync","TagA","OrderID188",("你好 RocketMQ,这是第 " + i + "条消息").getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("推送状态:" + sendResult.getSendStatus());
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
}
});
}
// 关闭生产者
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
}
消费者
/**
* 消费者
* @author Breeze
* @version V1.0
* @date 2019/1/31 18:46
*/
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 使用生产者组名进行实例化
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// 配置RocketMQ地址
consumer.setNamesrvAddr("192.168.0.180:9876");
// 订阅一个需要消费的主题
consumer.subscribe("TopicTest", "*");
// 注册一个回调,获取到消息后执行
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
byte[] msgBody = msg.getBody();
System.out.println("执行消费:" + new String(msgBody));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 开启消费者
consumer.start();
System.out.println("消费者已启动");
}
}
评论