在本教程中,您将学习如何在Spring Boot RabbitMQ应用程序中创建多个队列。您还将学习如何使用RabbitMQ在Spring引导应用程序中向生产者和消费者发送和接收消息。
注意,本教程将使用Springboot3和JDK17或更高版本。
RabbitMQ是一种消息队列软件(消息代理/队列管理器),充当不同应用程序可以发送和接收消息的中间平台。
我们将使用SpringAMQP模块在Spring引导应用程序中与RabbitMQ一起作为AMQP消息传递解决方案。
Producer是一个向RabbitMQ代理发送消息的应用程序,Consumer是从RabbitMQBroker读取消息的应用软件。
在本教程中,我们将实现以下Spring Boot RabbitMQ体系结构流:
我们将创建两个队列:
1.javaguides
2.javaguides_json
在“javaguides”队列中,我们将存储String类型的消息。
在“javaguides_json”队列中,我们将存储json类型的消息。
Exchange组件将使用路由密钥将邮件路由到相应的队列。
Docker-使用Docker在本地安装并设置RabbitMQ作为Docker容器。请参阅我在Install RabbitMQ using Docker上的单独指南。
使用https://start.spring.io/创建Spring引导项目,并提供屏幕截图中所示的详细信息:
以下是完整的pom.xml
文件,供您参考:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.0.0-SNAPSHOT</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>net.javaguides</groupId>
<artifactId>springboot-rabbitmq-tutorial</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot-rabbitmq-tutorial</name>
<description>Demo project for Spring Boot and RabbitMQ</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<releases>
<enabled>false</enabled>
</releases>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<releases>
<enabled>false</enabled>
</releases>
</pluginRepository>
</pluginRepositories>
</project>
按照以下屏幕截图创建项目结构:
我们将使用默认用户名和密码“guest/guest”连接到端口5672上的RabbitMQ代理。
Spring引导自动配置将使用默认配置详细信息自动将Spring引导应用程序与RabbitMQ连接起来,但您可以根据应用程序中的环境进行修改。属性文件:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
让我们创建RabbitMQConfig
类并向其中添加以下内容:
package net.javaguides.springboot.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Value("${rabbitmq.queue.name}")
private String queue;
@Value("${rabbitmq.queue.json.name}")
private String jsonQueue;
@Value("${rabbitmq.exchange.name}")
private String exchange;
@Value("${rabbitmq.routing.key}")
private String routingKey;
@Value("${rabbitmq.routing.json.key}")
private String routingJsonKey;
// spring bean for rabbitmq queue
@Bean
public Queue queue(){
return new Queue(queue);
}
// spring bean for queue (store json messages)
@Bean
public Queue jsonQueue(){
return new Queue(jsonQueue);
}
// spring bean for rabbitmq exchange
@Bean
public TopicExchange exchange(){
return new TopicExchange(exchange);
}
// binding between queue and exchange using routing key
@Bean
public Binding binding(){
return BindingBuilder
.bind(queue())
.to(exchange())
.with(routingKey);
}
// binding between json queue and exchange using routing key
@Bean
public Binding jsonBinding(){
return BindingBuilder
.bind(jsonQueue())
.to(exchange())
.with(routingJsonKey);
}
@Bean
public MessageConverter converter(){
return new Jackson2JsonMessageConverter();
}
@Bean
public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(converter());
return rabbitTemplate;
}
// ConnectionFactory
// RabbitTemplate
// RabbitAdmin
}
让我们了解一下上述配置。
我们已经为两个队列创建了两个Springbean:
// spring bean for rabbitmq queue
@Bean
public Queue queue(){
return new Queue(queue);
}
// spring bean for queue (store json messages)
@Bean
public Queue jsonQueue(){
return new Queue(jsonQueue);
}
我们已经创建了Springbean来配置Exchange:
// spring bean for rabbitmq exchange
@Bean
public TopicExchange exchange(){
return new TopicExchange(exchange);
}
我们已经创建了一个Springbean,用于使用路由键在“javaguides”队列和交换之间绑定:
// binding between queue and exchange using routing key
@Bean
public Binding binding(){
return BindingBuilder
.bind(queue())
.to(exchange())
.with(routingKey);
}
我们已经创建了一个Springbean,用于“javaguides_json”队列和使用路由键的交换之间的绑定:
// binding between json queue and exchange using routing key
@Bean
public Binding jsonBinding(){
return BindingBuilder
.bind(jsonQueue())
.to(exchange())
.with(routingJsonKey);
}
我们已将MessageConverter配置为RabbitTemplate以进行JSON序列化和反序列化:
@Bean
public MessageConverter converter(){
return new Jackson2JsonMessageConverter();
}
@Bean
public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(converter());
return rabbitTemplate;
}
确保将以下属性添加到application.properties
文件中:
rabbitmq.queue.name=javaguides
rabbitmq.exchange.name=javaguides_exchange
rabbitmq.routing.key=javaguides_routing_key
rabbitmq.queue.json.name=javaguides_json
rabbitmq.routing.json.key=javaguides_routing_json_key
我们将创建两个RabbitMQ生产者,一个发送String消息,另一个发送JSON消息。我们将使用RabbitTemplate
转换并使用RabbitMQ发送消息。它是一个助手类,与Spring中存在的许多其他Template类(如JdbcTemplate、KafkaTemplate等)一样。
让我们创建RabbitMQProducer
并向其中添加以下内容:
package net.javaguides.springboot.publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Service
public class RabbitMQProducer {
@Value("${rabbitmq.exchange.name}")
private String exchange;
@Value("${rabbitmq.routing.key}")
private String routingKey;
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQProducer.class);
private RabbitTemplate rabbitTemplate;
public RabbitMQProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendMessage(String message){
LOGGER.info(String.format("Message sent -> %s", message));
rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
}
让我们首先创建一个简单的User
POJO类来序列化/反序列化:
package net.javaguides.springboot.dto;
import lombok.Data;
@Data
public class User {
private int id;
private String firstName;
private String lastName;
}
现在,让我们创建RabbitMQJsonProducer
并向其中添加以下内容:
package net.javaguides.springboot.publisher;
import net.javaguides.springboot.dto.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Service
public class RabbitMQJsonProducer {
@Value("${rabbitmq.exchange.name}")
private String exchange;
@Value("${rabbitmq.routing.json.key}")
private String routingJsonKey;
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonProducer.class);
private RabbitTemplate rabbitTemplate;
public RabbitMQJsonProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendJsonMessage(User user){
LOGGER.info(String.format("Json message sent -> %s", user.toString()));
rabbitTemplate.convertAndSend(exchange, routingJsonKey, user);
}
}
型
我们将创建两个REST API,一个用于发送String消息,另一个用于传输JSON消息。
让我们创建一个简单的RESTAPI,从客户端获取消息,将该消息发送给RabbitMQ生产者。
让我们创建一个MessageController
并向其中添加以下内容:
package net.javaguides.springboot.controller;
import net.javaguides.springboot.publisher.RabbitMQProducer;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api/v1")
public class MessageController {
private RabbitMQProducer producer;
public MessageController(RabbitMQProducer producer) {
this.producer = producer;
}
// http://localhost:8080/api/v1/publish?message=hello
@GetMapping("/publish")
public ResponseEntity<String> sendMessage(@RequestParam("message") String message){
producer.sendMessage(message);
return ResponseEntity.ok("Message sent to RabbitMQ ...");
}
}
型
让我们创建一个REST API,从客户端获取JSON消息,将该JSON消息发送给RabbitMQ生产者。
让我们创建一个MessageJsonController
并向其中添加以下内容:
package net.javaguides.springboot.controller;
import net.javaguides.springboot.dto.User;
import net.javaguides.springboot.publisher.RabbitMQJsonProducer;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api/v1")
public class MessageJsonController {
private RabbitMQJsonProducer jsonProducer;
public MessageJsonController(RabbitMQJsonProducer jsonProducer) {
this.jsonProducer = jsonProducer;
}
@PostMapping("/publish")
public ResponseEntity<String> sendJsonMessage(@RequestBody User user){
jsonProducer.sendJsonMessage(user);
return ResponseEntity.ok("Json message sent to RabbitMQ ...");
}
}
型
我们将创建两个RabbitMQ消费者,一个使用String消息,另一个使用JSON消息
RabbitMQ消费者是 该服务将负责根据您自己的业务逻辑的需要读取消息并进行处理
让我们创建RabbitMQConsumer
类并向其中添加以下内容:
package net.javaguides.springboot.consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class RabbitMQConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQConsumer.class);
@RabbitListener(queues = {"${rabbitmq.queue.name}"})
public void consume(String message){
LOGGER.info(String.format("Received message -> %s", message));
}
}
型
让我们创建RabbitMQJsonConsumer
类,并向其中添加以下内容:
package net.javaguides.springboot.consumer;
import net.javaguides.springboot.dto.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class RabbitMQJsonConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonConsumer.class);
@RabbitListener(queues = {"${rabbitmq.queue.json.name}"})
public void consumeJsonMessage(User user){
LOGGER.info(String.format("Received JSON message -> %s", user.toString()));
}
}
型
我们使用@RabbitListener
注解配置消费者。这里传递的唯一参数是队列的名称。消费者不知道这里的交换或路由密钥。@RabbitListener
将触发Spring内部的逻辑,以查找从JSON到该特定类的转换器。
运行Spring引导应用程序并进行GET REST API调用,以向RabbitMQ发送String消息:
在本教程中,您学习了如何在SpringBootRabbitMQ应用程序中创建多个队列。您还学习了如何使用RabbitMQ在Spring引导应用程序中向生产者和消费者发送和接收消息。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://www.javaguides.net/2022/07/spring-boot-rabbitmq-multiple-queues.html
内容来源于网络,如有侵权,请联系作者删除!