我们使用主流的SpringBoot框架整合RocketMQ来讲解,使用方便快捷;
最终项目结构如下:
具体步骤如下:
rocketmq-test
,pom类型,主要是依赖管理,包括版本的管理,以及管理module子项目pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<modules>
<module>rocketmq-provider</module>
<module>rocketmq-consumer</module>
</modules>
<groupId>com.java1234</groupId>
<artifactId>rocketmq-test2</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<springboot.version>2.3.2.RELEASE</springboot.version>
<rocketmq.version>2.2.0</rocketmq.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${springboot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
rocketmq-provider
子项目pom.xml
加下依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rocketmq-test2</artifactId>
<groupId>com.java1234</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rocketmq-provider</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
</dependencies>
</project>
新建项目配置文件application.yml
,指定name-server,以及producer-group组
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: producer-demo1
新建消息生产者Service类ProducerService
package com.java1234.rocketmq;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/** * 消息生产者 * @author java1234_小锋 * @site www.java1234.com * @company 南通小锋网络科技有限公司 * @create 2021-08-22 22:16 */
@Component("producerService")
public class ProducerService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/** * 发送简单消息 */
public void sendMessage(){
for(int i=0;i<10;i++){
rocketMQTemplate.convertAndSend("java1234-rocketmq","rocketmq大爷,你好!"+i);
}
}
}
SpringBoot给我们提供了RocketMQTemplate
模板类,我们利用这个类可以以多种形式发送消息;
另外这个类我们要加下@Component
注解,让Spring来管理实例,方便其他地方获取bean来使用;
发送方法指定Topic主题java1234-rocketmq
;
启动类获取ProducerService
实例,调用发送消息方法
package com.java1234;
import com.java1234.rocketmq.ProducerService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
@SpringBootApplication
public class RocketmqTestApplication {
public static void main(String[] args) {
ConfigurableApplicationContext run = SpringApplication.run(RocketmqTestApplication.class, args);
ProducerService producerService = (ProducerService) run.getBean("producerService");
producerService.sendMessage();
}
}
我们获取ProducerService
实例,调用sendMessage
方法发送消息;
rocketmq-consumer
子项目pom.xml
加下依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rocketmq-test2</artifactId>
<groupId>com.java1234</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rocketmq-consumer</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
</dependencies>
</project>
新建项目配置文件application.yml
,指定name-server,以及consumer-group组
server:
port: 8084
servlet:
context-path: /
rocketmq:
name-server: 127.0.0.1:9876
consumer:
group: consumer-demo1
新建消息消费者Service类ConsumerService
,监听消息,消费消息
package com.java1234.rocketmq;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/** * 消息消费者 * @author java1234_小锋 * @site www.java1234.com * @company 南通小锋网络科技有限公司 * @create 2021-08-22 22:40 */
@RocketMQMessageListener(topic = "java1234-rocketmq",consumerGroup ="${rocketmq.consumer.group}" )
@Component
public class ConsumerService implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("收到消息内容:"+s);
}
}
消费者类要实现RocketMQListener
接口,以及动态指定消息类型String。
类上要加上//@RocketMQMessageListener/*/*注解,指定topic主题java1234-rocketmq
,以及消费者组${rocketmq.consumer.group}
同样这个类上也要加上@Component
注解,让Spring来管理bean实例;
先启动rocketmq-consumer
项目,监听消息
再启动rockeqmq-provider
项目,发送消息
消息消费者端收到消息:
测试OK,成功消费!
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/caoli201314/article/details/119900570
内容来源于网络,如有侵权,请联系作者删除!