Spring Boot RabbitMQ多队列示例

x33g5p2x  于2022-10-11 转载在 Spring  
字(13.2k)|赞(0)|评价(0)|浏览(818)

在本教程中,您将学习如何在Spring Boot RabbitMQ应用程序中创建多个队列。您还将学习如何使用RabbitMQ在Spring引导应用程序中向生产者和消费者发送和接收消息。
注意,本教程将使用Springboot3和JDK17或更高版本。
RabbitMQ是一种消息队列软件(消息代理/队列管理器),充当不同应用程序可以发送和接收消息的中间平台。
我们将使用SpringAMQP模块在Spring引导应用程序中与RabbitMQ一起作为AMQP消息传递解决方案。

具有多队列的Spring Boot RabbitMQ体系结构

Producer是一个向RabbitMQ代理发送消息的应用程序,Consumer是从RabbitMQBroker读取消息的应用软件。

在本教程中,我们将实现以下Spring Boot RabbitMQ体系结构流:


我们将创建两个队列:
1.javaguides
2.javaguides_json
在“javaguides”队列中,我们将存储String类型的消息。
在“javaguides_json”队列中,我们将存储json类型的消息。
Exchange组件将使用路由密钥将邮件路由到相应的队列。

先决条件

Docker-使用Docker在本地安装并设置RabbitMQ作为Docker容器。请参阅我在Install RabbitMQ using Docker上的单独指南。

1.在IntelliJ中创建和设置Spring Boot项目

使用https://start.spring.io/创建Spring引导项目,并提供屏幕截图中所示的详细信息:

2.Maven依赖项

以下是完整的pom.xml文件,供您参考:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>3.0.0-SNAPSHOT</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>net.javaguides</groupId>
	<artifactId>springboot-rabbitmq-tutorial</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>springboot-rabbitmq-tutorial</name>
	<description>Demo project for Spring Boot and RabbitMQ</description>
	<properties>
		<java.version>17</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.amqp</groupId>
			<artifactId>spring-rabbit-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<configuration>
					<excludes>
						<exclude>
							<groupId>org.projectlombok</groupId>
							<artifactId>lombok</artifactId>
						</exclude>
					</excludes>
				</configuration>
			</plugin>
		</plugins>
	</build>
	<repositories>
		<repository>
			<id>spring-milestones</id>
			<name>Spring Milestones</name>
			<url>https://repo.spring.io/milestone</url>
			<snapshots>
				<enabled>false</enabled>
			</snapshots>
		</repository>
		<repository>
			<id>spring-snapshots</id>
			<name>Spring Snapshots</name>
			<url>https://repo.spring.io/snapshot</url>
			<releases>
				<enabled>false</enabled>
			</releases>
		</repository>
	</repositories>
	<pluginRepositories>
		<pluginRepository>
			<id>spring-milestones</id>
			<name>Spring Milestones</name>
			<url>https://repo.spring.io/milestone</url>
			<snapshots>
				<enabled>false</enabled>
			</snapshots>
		</pluginRepository>
		<pluginRepository>
			<id>spring-snapshots</id>
			<name>Spring Snapshots</name>
			<url>https://repo.spring.io/snapshot</url>
			<releases>
				<enabled>false</enabled>
			</releases>
		</pluginRepository>
	</pluginRepositories>

</project>

3.项目结构

按照以下屏幕截图创建项目结构:

4.将Spring Boot应用程序与RabbitMQ连接

我们将使用默认用户名和密码“guest/guest”连接到端口5672上的RabbitMQ代理。
Spring引导自动配置将使用默认配置详细信息自动将Spring引导应用程序与RabbitMQ连接起来,但您可以根据应用程序中的环境进行修改。属性文件:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

5.在Spring Boot应用程序中配置RabbitMQ

让我们创建RabbitMQConfig类并向其中添加以下内容:

package net.javaguides.springboot.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    @Value("${rabbitmq.queue.name}")
    private String queue;

    @Value("${rabbitmq.queue.json.name}")
    private String jsonQueue;

    @Value("${rabbitmq.exchange.name}")
    private String exchange;

    @Value("${rabbitmq.routing.key}")
    private String routingKey;

    @Value("${rabbitmq.routing.json.key}")
    private String routingJsonKey;

    // spring bean for rabbitmq queue
    @Bean
    public Queue queue(){
        return new Queue(queue);
    }

    // spring bean for queue (store json messages)
    @Bean
    public Queue jsonQueue(){
        return new Queue(jsonQueue);
    }

    // spring bean for rabbitmq exchange
    @Bean
    public TopicExchange exchange(){
        return new TopicExchange(exchange);
    }

    // binding between queue and exchange using routing key
    @Bean
    public Binding binding(){
        return BindingBuilder
                .bind(queue())
                .to(exchange())
                .with(routingKey);
    }

    // binding between json queue and exchange using routing key
    @Bean
    public Binding jsonBinding(){
        return BindingBuilder
                .bind(jsonQueue())
                .to(exchange())
                .with(routingJsonKey);
    }

    @Bean
    public MessageConverter converter(){
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(converter());
        return rabbitTemplate;
    }

    // ConnectionFactory
    // RabbitTemplate
    // RabbitAdmin
}

让我们了解一下上述配置。
 
我们已经为两个队列创建了两个Springbean:

// spring bean for rabbitmq queue
    @Bean
    public Queue queue(){
        return new Queue(queue);
    }

    // spring bean for queue (store json messages)
    @Bean
    public Queue jsonQueue(){
        return new Queue(jsonQueue);
    }

我们已经创建了Springbean来配置Exchange:

// spring bean for rabbitmq exchange
    @Bean
    public TopicExchange exchange(){
        return new TopicExchange(exchange);
    }

我们已经创建了一个Springbean,用于使用路由键在“javaguides”队列和交换之间绑定:

// binding between queue and exchange using routing key
    @Bean
    public Binding binding(){
        return BindingBuilder
                .bind(queue())
                .to(exchange())
                .with(routingKey);
    }

我们已经创建了一个Springbean,用于“javaguides_json”队列和使用路由键的交换之间的绑定:

// binding between json queue and exchange using routing key
    @Bean
    public Binding jsonBinding(){
        return BindingBuilder
                .bind(jsonQueue())
                .to(exchange())
                .with(routingJsonKey);
    }

我们已将MessageConverter配置为RabbitTemplate以进行JSON序列化和反序列化:

@Bean
    public MessageConverter converter(){
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(converter());
        return rabbitTemplate;
    }

应用程序。属性文件更改

确保将以下属性添加到application.properties文件中:

rabbitmq.queue.name=javaguides
rabbitmq.exchange.name=javaguides_exchange
rabbitmq.routing.key=javaguides_routing_key
rabbitmq.queue.json.name=javaguides_json
rabbitmq.routing.json.key=javaguides_routing_json_key

6.创建RabbitMQ生产者

我们将创建两个RabbitMQ生产者,一个发送String消息,另一个发送JSON消息。我们将使用RabbitTemplate转换并使用RabbitMQ发送消息。它是一个助手类,与Spring中存在的许多其他Template类(如JdbcTemplate、KafkaTemplate等)一样。

RabbitMQProducer-发送字符串消息

让我们创建RabbitMQProducer并向其中添加以下内容:

package net.javaguides.springboot.publisher;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
public class RabbitMQProducer {

    @Value("${rabbitmq.exchange.name}")
    private String exchange;

    @Value("${rabbitmq.routing.key}")
    private String routingKey;

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQProducer.class);

    private RabbitTemplate rabbitTemplate;

    public RabbitMQProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendMessage(String message){
        LOGGER.info(String.format("Message sent -> %s", message));
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
    }
}

RabbitMQJsonProducer-发送JSON消息

让我们首先创建一个简单的User POJO类来序列化/反序列化:

package net.javaguides.springboot.dto;

import lombok.Data;

@Data
public class User {
    private int id;
    private String firstName;
    private String lastName;
}

现在,让我们创建RabbitMQJsonProducer并向其中添加以下内容:

package net.javaguides.springboot.publisher;

import net.javaguides.springboot.dto.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
public class RabbitMQJsonProducer {

    @Value("${rabbitmq.exchange.name}")
    private String exchange;

    @Value("${rabbitmq.routing.json.key}")
    private String routingJsonKey;

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonProducer.class);

    private RabbitTemplate rabbitTemplate;

    public RabbitMQJsonProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendJsonMessage(User user){
        LOGGER.info(String.format("Json message sent -> %s", user.toString()));
        rabbitTemplate.convertAndSend(exchange, routingJsonKey, user);
    }

}

7.创建REST API以发送字符串和JSON消息

我们将创建两个REST API,一个用于发送String消息,另一个用于传输JSON消息。

消息控制器

让我们创建一个简单的RESTAPI,从客户端获取消息,将该消息发送给RabbitMQ生产者。
让我们创建一个MessageController并向其中添加以下内容:

package net.javaguides.springboot.controller;

import net.javaguides.springboot.publisher.RabbitMQProducer;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/api/v1")
public class MessageController {

    private RabbitMQProducer producer;

    public MessageController(RabbitMQProducer producer) {
        this.producer = producer;
    }

    // http://localhost:8080/api/v1/publish?message=hello
    @GetMapping("/publish")
    public ResponseEntity<String> sendMessage(@RequestParam("message") String message){
        producer.sendMessage(message);
        return ResponseEntity.ok("Message sent to RabbitMQ ...");
    }
}

消息Json控制器

让我们创建一个REST API,从客户端获取JSON消息,将该JSON消息发送给RabbitMQ生产者。
让我们创建一个MessageJsonController并向其中添加以下内容:

package net.javaguides.springboot.controller;

import net.javaguides.springboot.dto.User;
import net.javaguides.springboot.publisher.RabbitMQJsonProducer;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/api/v1")
public class MessageJsonController {

    private RabbitMQJsonProducer jsonProducer;

    public MessageJsonController(RabbitMQJsonProducer jsonProducer) {
        this.jsonProducer = jsonProducer;
    }

    @PostMapping("/publish")
    public ResponseEntity<String> sendJsonMessage(@RequestBody User user){
        jsonProducer.sendJsonMessage(user);
        return ResponseEntity.ok("Json message sent to RabbitMQ ...");
    }
}

8.创建RabbitMQ消费者

我们将创建两个RabbitMQ消费者,一个使用String消息,另一个使用JSON消息
RabbitMQ消费者是 该服务将负责根据您自己的业务逻辑的需要读取消息并进行处理

RabbitMQ消费者-消费字符串消息

让我们创建RabbitMQConsumer类并向其中添加以下内容:

package net.javaguides.springboot.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class RabbitMQConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQConsumer.class);

    @RabbitListener(queues = {"${rabbitmq.queue.name}"})
    public void consume(String message){
        LOGGER.info(String.format("Received message -> %s", message));
    }
}

RabbitMQ消费者-消费JSON消息

让我们创建RabbitMQJsonConsumer类,并向其中添加以下内容:

package net.javaguides.springboot.consumer;

import net.javaguides.springboot.dto.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class RabbitMQJsonConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonConsumer.class);

    @RabbitListener(queues = {"${rabbitmq.queue.json.name}"})
    public void consumeJsonMessage(User user){
        LOGGER.info(String.format("Received JSON message -> %s", user.toString()));
    }
}


我们使用@RabbitListener注解配置消费者。这里传递的唯一参数是队列的名称。消费者不知道这里的交换或路由密钥。
@RabbitListener将触发Spring内部的逻辑,以查找从JSON到该特定类的转换器。

9.演示

运行Spring引导应用程序并进行GET REST API调用,以向RabbitMQ发送String消息:

检查控制台日志

结论

在本教程中,您学习了如何在SpringBootRabbitMQ应用程序中创建多个队列。您还学习了如何使用RabbitMQ在Spring引导应用程序中向生产者和消费者发送和接收消息。

相关文章