本文笔者是一波三折啊,很多人像我一样第一次在springboot里搞rocketmq的,遇到各种麻烦,我也是一样,就比如:
踩坑:
本文是简单的使用发送消息和消费消息,让大家了解一下流程。
在此之前我们要启动一个nameserver和一个broker
上文我已经介绍了,链接如下:启动消息服务
第一步
创建一个maven项目,pom文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>rocketmq_demo</groupId>
<artifactId>rocketmq_demo</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-tools</artifactId>
<version>4.2.0</version>
</dependency>
</dependencies>
</project>
第二步
创建启动类,RocketMQApp,这个启动类当作生产者
package com;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author yanlin
* @version v1.3
* @date 2019-01-24 3:43 PM
* @since v8.0
**/
@SpringBootApplication
public class RocketMQApp {
public static void main(String[] args) throws MQClientException, InterruptedException {
SpringApplication.run(RocketMQApp.class, args);
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 20; i++) {
try {
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
第三步
创建消费者 RocketMQConsumer,代码如下
package com.rocketmq;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* @author yanlin
* @version v1.3
* @date 2019-01-24 3:24 PM
* @since v8.0
**/
public class RocketMQConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
最后一步
右键启动启动类-RocketMQApp,观察控制台:
主要发送消息的日志如下
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D94180000, offsetMsgId=0A08406F00002A9F0000000000058B56, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=0], queueOffset=505]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D94270001, offsetMsgId=0A08406F00002A9F0000000000058C08, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=1], queueOffset=505]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D942A0002, offsetMsgId=0A08406F00002A9F0000000000058CBA, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=2], queueOffset=505]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D942D0003, offsetMsgId=0A08406F00002A9F0000000000058D6C, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=3], queueOffset=505]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D942F0004, offsetMsgId=0A08406F00002A9F0000000000058E1E, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=0], queueOffset=506]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D94310005, offsetMsgId=0A08406F00002A9F0000000000058ED0, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=1], queueOffset=506]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D943C0006, offsetMsgId=0A08406F00002A9F0000000000058F82, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=2], queueOffset=506]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D943E0007, offsetMsgId=0A08406F00002A9F0000000000059034, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=3], queueOffset=506]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D94410008, offsetMsgId=0A08406F00002A9F00000000000590E6, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=0], queueOffset=507]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D94440009, offsetMsgId=0A08406F00002A9F0000000000059198, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=1], queueOffset=507]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D944C000A, offsetMsgId=0A08406F00002A9F000000000005924A, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=2], queueOffset=507]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D944E000B, offsetMsgId=0A08406F00002A9F00000000000592FD, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=3], queueOffset=507]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D9452000C, offsetMsgId=0A08406F00002A9F00000000000593B0, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=0], queueOffset=508]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D9455000D, offsetMsgId=0A08406F00002A9F0000000000059463, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=1], queueOffset=508]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D945A000E, offsetMsgId=0A08406F00002A9F0000000000059516, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=2], queueOffset=508]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D945E000F, offsetMsgId=0A08406F00002A9F00000000000595C9, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=3], queueOffset=508]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D94600010, offsetMsgId=0A08406F00002A9F000000000005967C, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=0], queueOffset=509]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D94650011, offsetMsgId=0A08406F00002A9F000000000005972F, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=1], queueOffset=509]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D94670012, offsetMsgId=0A08406F00002A9F00000000000597E2, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=2], queueOffset=509]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D946C0013, offsetMsgId=0A08406F00002A9F0000000000059895, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=3], queueOffset=509]
右键启动消费者-RocketMQConsumer,观察控制台如下
Receive New Messages 后面的就是消息主体
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0, storeSize=178, queueOffset=505, sysFlag=0, bornTimestamp=1548321807386, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807396, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F0000000000058B56, commitLogOffset=363350, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=506, CONSUME_START_TIME=1548321807411, UNIQ_KEY=0A08406F070318B4AAC27A2D94180000, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=1, storeSize=178, queueOffset=505, sysFlag=0, bornTimestamp=1548321807400, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807400, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F0000000000058C08, commitLogOffset=363528, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=506, CONSUME_START_TIME=1548321807411, UNIQ_KEY=0A08406F070318B4AAC27A2D94270001, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_5 Receive New Messages: [MessageExt [queueId=0, storeSize=178, queueOffset=506, sysFlag=0, bornTimestamp=1548321807407, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807408, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F0000000000058E1E, commitLogOffset=364062, bodyCRC=601994070, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=507, CONSUME_START_TIME=1548321807413, UNIQ_KEY=0A08406F070318B4AAC27A2D942F0004, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=3, storeSize=178, queueOffset=505, sysFlag=0, bornTimestamp=1548321807405, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807406, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F0000000000058D6C, commitLogOffset=363884, bodyCRC=1032136437, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=506, CONSUME_START_TIME=1548321807412, UNIQ_KEY=0A08406F070318B4AAC27A2D942D0003, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_3 Receive New Messages: [MessageExt [queueId=2, storeSize=178, queueOffset=505, sysFlag=0, bornTimestamp=1548321807402, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807403, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F0000000000058CBA, commitLogOffset=363706, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=506, CONSUME_START_TIME=1548321807413, UNIQ_KEY=0A08406F070318B4AAC27A2D942A0002, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_6 Receive New Messages: [MessageExt [queueId=1, storeSize=178, queueOffset=506, sysFlag=0, bornTimestamp=1548321807409, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807415, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F0000000000058ED0, commitLogOffset=364240, bodyCRC=1424393152, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=507, CONSUME_START_TIME=1548321807418, UNIQ_KEY=0A08406F070318B4AAC27A2D94310005, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_7 Receive New Messages: [MessageExt [queueId=2, storeSize=178, queueOffset=506, sysFlag=0, bornTimestamp=1548321807420, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807421, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F0000000000058F82, commitLogOffset=364418, bodyCRC=1307562618, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=507, CONSUME_START_TIME=1548321807423, UNIQ_KEY=0A08406F070318B4AAC27A2D943C0006, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_8 Receive New Messages: [MessageExt [queueId=3, storeSize=178, queueOffset=506, sysFlag=0, bornTimestamp=1548321807422, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807423, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F0000000000059034, commitLogOffset=364596, bodyCRC=988340972, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=507, CONSUME_START_TIME=1548321807426, UNIQ_KEY=0A08406F070318B4AAC27A2D943E0007, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_9 Receive New Messages: [MessageExt [queueId=0, storeSize=178, queueOffset=507, sysFlag=0, bornTimestamp=1548321807425, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807427, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F00000000000590E6, commitLogOffset=364774, bodyCRC=710410109, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=508, CONSUME_START_TIME=1548321807430, UNIQ_KEY=0A08406F070318B4AAC27A2D94410008, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_10 Receive New Messages: [MessageExt [queueId=1, storeSize=178, queueOffset=507, sysFlag=0, bornTimestamp=1548321807428, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807430, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F0000000000059198, commitLogOffset=364952, bodyCRC=1565577195, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=508, CONSUME_START_TIME=1548321807432, UNIQ_KEY=0A08406F070318B4AAC27A2D94440009, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_11 Receive New Messages: [MessageExt [queueId=2, storeSize=179, queueOffset=507, sysFlag=0, bornTimestamp=1548321807436, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807437, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F000000000005924A, commitLogOffset=365130, bodyCRC=193412630, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=508, CONSUME_START_TIME=1548321807439, UNIQ_KEY=0A08406F070318B4AAC27A2D944C000A, WAIT=true, TAGS=TagA}, body=17]]]
ConsumeMessageThread_12 Receive New Messages: [MessageExt [queueId=3, storeSize=179, queueOffset=507, sysFlag=0, bornTimestamp=1548321807438, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807439, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F00000000000592FD, commitLogOffset=365309, bodyCRC=2088767104, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=508, CONSUME_START_TIME=1548321807442, UNIQ_KEY=0A08406F070318B4AAC27A2D944E000B, WAIT=true, TAGS=TagA}, body=17]]]
ConsumeMessageThread_13 Receive New Messages: [MessageExt [queueId=0, storeSize=179, queueOffset=508, sysFlag=0, bornTimestamp=1548321807442, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807443, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F00000000000593B0, commitLogOffset=365488, bodyCRC=1703501626, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=509, CONSUME_START_TIME=1548321807447, UNIQ_KEY=0A08406F070318B4AAC27A2D9452000C, WAIT=true, TAGS=TagA}, body=17]]]
ConsumeMessageThread_14 Receive New Messages: [MessageExt [queueId=1, storeSize=179, queueOffset=508, sysFlag=0, bornTimestamp=1548321807445, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807446, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F0000000000059463, commitLogOffset=365667, bodyCRC=311324588, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=509, CONSUME_START_TIME=1548321807448, UNIQ_KEY=0A08406F070318B4AAC27A2D9455000D, WAIT=true, TAGS=TagA}, body=17]]]
ConsumeMessageThread_15 Receive New Messages: [MessageExt [queueId=2, storeSize=179, queueOffset=508, sysFlag=0, bornTimestamp=1548321807450, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807451, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F0000000000059516, commitLogOffset=365846, bodyCRC=216726031, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=509, CONSUME_START_TIME=1548321807453, UNIQ_KEY=0A08406F070318B4AAC27A2D945A000E, WAIT=true, TAGS=TagA}, body=17]]]
ConsumeMessageThread_16 Receive New Messages: [MessageExt [queueId=3, storeSize=179, queueOffset=508, sysFlag=0, bornTimestamp=1548321807454, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807454, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F00000000000595C9, commitLogOffset=366025, bodyCRC=2079181465, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=509, CONSUME_START_TIME=1548321807457, UNIQ_KEY=0A08406F070318B4AAC27A2D945E000F, WAIT=true, TAGS=TagA}, body=17]]]
ConsumeMessageThread_17 Receive New Messages: [MessageExt [queueId=0, storeSize=179, queueOffset=509, sysFlag=0, bornTimestamp=1548321807456, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807458, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F000000000005967C, commitLogOffset=366204, bodyCRC=1659149091, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=510, CONSUME_START_TIME=1548321807460, UNIQ_KEY=0A08406F070318B4AAC27A2D94600010, WAIT=true, TAGS=TagA}, body=17]]]
ConsumeMessageThread_18 Receive New Messages: [MessageExt [queueId=1, storeSize=179, queueOffset=509, sysFlag=0, bornTimestamp=1548321807461, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807462, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F000000000005972F, commitLogOffset=366383, bodyCRC=367242165, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=510, CONSUME_START_TIME=1548321807464, UNIQ_KEY=0A08406F070318B4AAC27A2D94650011, WAIT=true, TAGS=TagA}, body=17]]]
ConsumeMessageThread_19 Receive New Messages: [MessageExt [queueId=2, storeSize=179, queueOffset=509, sysFlag=0, bornTimestamp=1548321807463, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807467, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F00000000000597E2, commitLogOffset=366562, bodyCRC=89962020, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=510, CONSUME_START_TIME=1548321807469, UNIQ_KEY=0A08406F070318B4AAC27A2D94670012, WAIT=true, TAGS=TagA}, body=17]]]
ConsumeMessageThread_20 Receive New Messages: [MessageExt [queueId=3, storeSize=179, queueOffset=509, sysFlag=0, bornTimestamp=1548321807468, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807469, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F0000000000059895, commitLogOffset=366741, bodyCRC=1918600882, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=510, CONSUME_START_TIME=1548321807471, UNIQ_KEY=0A08406F070318B4AAC27A2D946C0013, WAIT=true, TAGS=TagA}, body=17]]]
我们发送了20条,然后成功消费20条,这就是rocketmq最简单的应用案例了。以后的篇幅我就会讲解更细致的内容,让大家能适用任何场景下,让大家了解各个对象的原理。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_38003389/article/details/86591212
内容来源于网络,如有侵权,请联系作者删除!