Spring Cloud Alibaba 04_使用 RocketMQ 实现消息的生产和消费
RocketMQ 的下载安装和配置
官网下载地址:http://rocketmq.apache.org/dowloading/releases/
在 /usr/local 下创建 rocket 文件夹,将 rocketmq-all-4.7.1-bin-release.zip 传入此文件夹并解压:
unzip rocketmq-all-4.7.1-bin-release.zip
如果提示 unzip 命令未安装,可用使用如下命令安装:
yum install -y unzip zip
解压完成后,进入 /usr/local/rocket/rocketmq-all-4.7.1-bin-release/bin 文件目录,启动 NameServer:
不要写localhost,要写公网ip!!!
nohup ./mqnamesrv -n 118.31.106.51:9876 &
检查是否启动成功:
netstat -an | grep 9876
或者
jps
启动 Broker 之前需要先修改 runserver.sh、runbroker.sh 和 tools.sh 中的 JVM 内存设置:
vim runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"
-----------------------------------------------------------------------------------
vim runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx256m -Xmn256m"
-----------------------------------------------------------------------------------
vim tools.sh
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"
启动 Broker (要写公网ip,不要写localhost,参数一个都不能少!)
nohup sh ./mqbroker -n 118.31.106.51:9876 -c ../conf/broker.conf autoCreateTopicEnable=true &
查看日志
tail -f ~/logs/rocketmqlogs/broker.log
# 出现以下日志就说明启动成功
2020-10-26 11:07:01 INFO main - The broker[izbp10tup89om84qulgxbsz, 172.20.171.240:10911] boot success. serializeType=JSON and name server is localhost:9876
在 bin 目录下测试使用 RocketMQ 进行消息发送和接收:
# 指定NameServer的ip和端口
export NAMESRV_ADDR=localhost:9876
# 发送消息
./tools.sh org.apache.rocketmq.example.quickstart.Producer
# 接收消息
./tools.sh org.apache.rocketmq.example.quickstart.Consumer
安装RocketMQ控制台
下载地址:https://github.com/apache/rocketmq-externals
首先修改 RocketMQ conf 目录下的 broker.conf 文件,添加以下两个配置(设置允许远程跨域连接):
brokerIP1=118.31.106.51
namesrvAddr=118.31.106.51:9876
修改下载好的 rocketmq-console 中的 application.properties 配置文件
server.port=9877
rocketmq.config.namesrvAddr=118.31.106.51:9876
在 rocketmq-console 的根目录使用如下命令打包:
mvn clean package -Dmaven.test.skip=true
运行 target 目录里生成的 jar 包
java -jar rocketmq-console-ng-1.0.0.jar
然后访问 http://localhost:9877 进入控制台
java 测试消息的生产和消费
添加 rocketmq 依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
测试生产消息:
public class Test {
public static void main(String[] args) throws Exception {
//创建消息生产者
DefaultMQProducer producer = new DefaultMQProducer("blu-producer-group");
//设置NameServer
producer.setNamesrvAddr("118.31.106.51:9876");
//启动生产者
producer.start();
//构建消息对象
Message message = new Message("bluTopic", "bluTag", ("Test MQ").getBytes());
//发送消息
SendResult result = producer.send(message, 5000);
System.out.println(result);
//关闭生产者
producer.shutdown();
}
}
运行结果:
SendResult [sendStatus=SEND_OK, msgId=00000000000000000000000000000001000018B4AAC2841C69D40000, offsetMsgId=761F6A3300002A9F00000000000C6301, messageQueue=MessageQueue [topic=bluTopic, brokerName=broker-a, queueId=3], queueOffset=0]
15:40:55.690 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[118.31.106.51:9876] result: true
15:40:55.702 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[118.31.106.51:10911] result: true
测试消费消息:
public class ConsumerTest {
public static void main(String[] args) throws MQClientException {
//创建消息消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("blu-consumer-group");
//设置NameServer
consumer.setNamesrvAddr("118.31.106.51:9876");
//指定订阅的主题和标签
consumer.subscribe("bluTopic","*");
//回调函数
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.println("message ==>"+ list);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
运行后将一直处于监听状态,当消息一被生产就会被其消费:
message ==>[MessageExt [queueId=3, storeSize=217, queueOffset=0, sysFlag=0, bornTimestamp=1603698054612, bornHost=/122.96.140.226:34668, storeTimestamp=1603698054317, storeHost=/118.31.106.51:10911, msgId=761F6A3300002A9F00000000000C6301, commitLogOffset=811777, bodyCRC=689321773, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='bluTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1603871119123, UNIQ_KEY=00000000000000000000000000000001000018B4AAC2841C69D40000, CLUSTER=DefaultCluster, WAIT=true, TAGS=bluTag}, body=[84, 101, 115, 116, 32, 77, 81], transactionId='null'}]]
message ==>[MessageExt [queueId=2, storeSize=217, queueOffset=0, sysFlag=0, bornTimestamp=1603697450377, bornHost=/112.86.129.72:18631, storeTimestamp=1603697450096, storeHost=/118.31.106.51:10911, msgId=761F6A3300002A9F00000000000C6228, commitLogOffset=811560, bodyCRC=689321773, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='bluTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1603871119123, UNIQ_KEY=00000000000000000000000000000001000018B4AAC2841331850000, CLUSTER=DefaultCluster, WAIT=true, TAGS=bluTag}, body=[84, 101, 115, 116, 32, 77, 81], transactionId='null'}]]
SpringBoot 整合 RocketMQ
服务提供者 Provider:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>
# RocketMQ 配置
rocketmq:
name-server: 118.31.106.51:9876
producer:
group: blu-provider
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {
private Integer id;
private String buyerName;
private String buyerTel;
private String address;
private Date createDate;
}
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping("create")
public Order create(){
Order order = new Order(1,"BLU","15651776666","江苏南京",new Date());
this.rocketMQTemplate.convertAndSend("blu-order-topic",order);
return order;
}
服务消费者 Consumer:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>
rocketmq:
name-server: 118.31.106.51:9876
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {
private Integer id;
private String buyerName;
private String buyerTel;
private String address;
private Date createDate;
}
package com.blu.service;
import com.blu.entity.Order;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(consumerGroup = "blu-consumer",topic = "blu-order-topic")
public class MessageService implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
System.out.println("新的订单:"+order);
}
}
启动,每当请求 Provider 的 /create 接口生产消息时,Consumer就会同步接收到消息:
新的订单:Order(id=1, buyerName=BLU, buyerTel=15651776666, address=江苏南京, createDate=Wed Oct 28 17:08:13 CST 2020)
新的订单:Order(id=1, buyerName=BLU, buyerTel=15651776666, address=江苏南京, createDate=Wed Oct 28 17:09:44 CST 2020)
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blucoding.blog.csdn.net/article/details/109350788
内容来源于网络,如有侵权,请联系作者删除!