SpringBoot整合Rabbitmq

x33g5p2x  于2022-02-11 转载在 Spring  
字(7.3k)|赞(0)|评价(0)|浏览(470)

1.安装rabbitmq

可参考这篇文章链接: https://www.jianshu.com/p/14ffe0f3db94.

2.SpringBoot整合Rabbitmq案例

2.1 Direct模式(点对点)

1.docker上启动rabbitmq实例
2.pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.3</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.yl</groupId>
    <artifactId>amqp</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>amqp</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>11</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

3.application.properties

# rabbitmq的配置
# 主机
spring.rabbitmq.host=192.168.244.135
# 端口
spring.rabbitmq.port=5672
# 用户名
spring.rabbitmq.username=guest
# 密码
spring.rabbitmq.password=guest

4.Direct模式配置文件

package com.yl.amqp.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectConfig {

    //定义队列
    @Bean
    Queue directQueue() {
        return new Queue("yl-queue",true);
    }

    //定义Direct交换机
    @Bean
    DirectExchange directExchange() {
        return new DirectExchange("yl-direct",true,false);
    }

    //将队列绑定到交换机上
    @Bean
    Binding directBiding() {
        return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct");
    }
}

5.监听消息队列

package com.yl.amqp.receicer;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class DirectReceiver {

    // 监听接收消息
    @RabbitListener(queues = "yl-queue")
    public void handler(String msg) {
        System.out.println(msg);
    }
}

6.测试

2.2 Fanout模式(点对面)

1.Fanout模式配置

package com.yl.amqp.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    @Bean
    Queue queueOne() {
        return new Queue("queue-one",true);
    }
    @Bean
    Queue queueTwo() {
        return new Queue("queue-two",true);
    }

    // 定义fanout交换机
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("yl-fanout",true,false);
    }

    // 将队列绑定到交换机
    @Bean
    Binding bindingOne() {
        return BindingBuilder.bind(queueOne()).to(fanoutExchange());
    }

    // 将队列绑定到交换机
    @Bean
    Binding bindingTwo() {
        return BindingBuilder.bind(queueTwo()).to(fanoutExchange());
    }
}

2.监听消息

package com.yl.amqp.receicer;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class FanoutReceiver {
    @RabbitListener(queues = "queue-one")
    public void handler1(String msg) {
        System.out.println("handle1()"+msg);
    }
    @RabbitListener(queues = "queue-two")
    public void handle2(String msg) {
        System.out.println("handle2()"+msg);
    }
}

3.测试

2.3 Tpoic模式(点对面)

1.Topic模式配置

package com.yl.amqp.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TopicConfig {
    @Bean
    Queue huawei() {
        return new Queue("huawei",true);
    }
    @Bean
    Queue apple() {
        return new Queue("apple",true);
    }
    @Bean
    Queue phone() {
        return new Queue("phone",true);
    }

    @Bean
    TopicExchange topicExchange() {
        return new TopicExchange("yl-topic",true,false);
    }

    @Bean
    Binding huaweiBinding() {
        return BindingBuilder.bind(huawei()).to(topicExchange()).with("huawei.#");
    }
    @Bean
    Binding appleBinding() {
        return BindingBuilder.bind(apple()).to(topicExchange()).with("apple.#");
    }
    @Bean
    Binding phoneBinding() {
        return BindingBuilder.bind(phone()).to(topicExchange()).with("#.phone.#");
    }

}

2.监听消息

package com.yl.amqp.receicer;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class TopicReceiver {
    @RabbitListener(queues = "huawei")
    public void handle1(String msg) {
        System.out.println("handle1()" + msg);
    }
    @RabbitListener(queues = "apple")
    public void handle2(String msg) {
        System.out.println("handle2()" + msg);
    }
    @RabbitListener(queues = "phone")
    public void handle3(String msg) {
        System.out.println("handle3()" + msg);
    }
}

3.测试1

4.测试2

2.4 Headers模式(很少用到)

1.Headers模式的配置

package com.yl.amqp.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class HeaderConfig {
    @Bean
    Queue ageQueue() {
        return new Queue("queue-age",true);
    }
    @Bean
    Queue nameQueue() {
        return new Queue("queue-name",true);
    }

    @Bean
    HeadersExchange headersExchange() {
        return new HeadersExchange("yl-headers",true,false);
    }

    @Bean
    Binding ageBinding() {
        Map<String,Object> map = new HashMap<>();
        map.put("age",18);
        return BindingBuilder.bind(ageQueue()).to(headersExchange()).whereAny(map).match();
    }

    @Bean
    Binding nameBinding() {
        return BindingBuilder.bind(nameQueue()).to(headersExchange()).where("name").exists();
    }
}

2.监听消息

package com.yl.amqp.receicer;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class HeadersReceiver {
    @RabbitListener(queues = "queue-age")
    public void handle1(String msg) {
        System.out.println("handle1(),queue-age:" + msg);
    }

    @RabbitListener(queues = "queue-name")
    public void handle2(String msg) {
        System.out.println("handle2(),queue-name:" + msg);
    }
}

3.测试1

4.测试2

相关文章