参考学习视频链接: https://www.bilibili.com/video/BV1xm4y1D7Tz
参考松哥文章:https://mp.weixin.qq.com/s/YPmW9_d4YdcjShqf255g7g
JMS规范
JMS即Java消息服务(Java Message Service)应用程序接口
,是一个Java
平台中关于面向消息中间件的API
,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
JMS是一种与厂商无关的 API,用来访问收发系统消息,它类似于JDBC
。这里,JDBC
是可以用来访问许多不同关系数据库的 API,而 JMS
则提供同样与厂商无关的访问方法,以访问消息收发服务。
JMS消息通常有两种类型:
基于JMS规范的实现:
AMQP在2003年时被提出,最早用于解决金融领域不同平台之间的消息传递交互问题。顾名思义,AMQP是一个协议,更准确的来说是一种binary wirelevel protocol(连接协议)。这是其和JMS的本质差别,AMQP不在API层进行限定,而是直接定义网络交换的数据格式。
在 AMQP 协议中,消息收发涉及到如下一些概念:
Message Broker
。AMQP
的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ 提供的服务时,可以划分出多个 vhost
,每个用户在自己的 vhost 中创建 exchange/queue
等。TCP 连接
,断开连接的操作只会在 client 端进行,Broker 不会断开连接,除非出现网络故障或 broker 服务出现问题。Connection
,在消息量大的时候建立 TCP Connection
的开销将是巨大的,效率也较低。Channel 是在 Connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 Thread 创建单独的 Channel 进行通讯,AMQP method
包含了 Channel id
帮助客户端和 Message Broker
识别 Channel,所以 Channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP Connection 的开销。routing key
,分发消息到 queue
中去。常用的类型有:direct
(点对点), topic
(发布订阅) 以及 fanout
(广播)。binding
中可以包含 routing key
,Binding
信息被保存到 Exchange
中的查询表中,作为 Message
的分发依据。基于AMQP协议的实现:
对于我们 Java 工程师而言,大家日常接触较多的应该是 JMS 和 AMQP 协议,既然 JMS 和 AMQP 都是协议,那么两者有什么区别呢?
项目 | AMQP | JMS |
---|---|---|
定义 | 线级协议 | Java API |
跨平台 | 是 | 否 |
跨语言 | 是 | 否 |
消息收发模型 | 4种消息收发模型:Direct、Fanout、Topic、Header | 2种消息收发模型: P2P、Pub/Sub |
消息类型 | 二进制数据类型 | 5种消息类型:Text、Object、Bytes、Stream、Map |
消息流 | Producer将消息发送到Exchange,Exchange将消息路由到Queue,Consumer从Queue中消费消息 | Producer将消息发送到Queue或者Topic,Concumer从Queue或者Topic中消费消息 |
ActiveMQ
ActiveMQ 是 Apache 下的一个子项目,使用完全支持 JMS1.1 和 J2EE1.4 规范的 JMS Provider 实现,少量代码就可以高效地实现高级应用场景,并且支持可插拔的传输协议,如:in-VM, TCP, SSL, NIO, UDP, multicast, JGroups and JXTA transports。
ActiveMQ 支持常用的多种语言客户端如 C++、Java、.Net,、Python、 Php、 Ruby 等。
现在的 ActiveMQ 分为两个版本:
这里的 ActiveMQ Classic 就是原来的 ActiveMQ,而 ActiveMQ Artemis 是在 RedHat 捐赠的 HornetQ 服务器代码的基础上开发的,两者代码完全不同,后者支持 JMS2.0,使用基于 Netty 的异步 IO,大大提升了性能,更为神奇的是,后者不仅支持 JMS 协议,还支持 AMQP 协议、STOMP 以及 MQTT,可以说后者的玩法相当丰富。
因此大家在使用时,建议直接选择 ActiveMQ Artemis。
RabbitMQ
RabbitMQ 算是 AMQP 体系下最为重要的产品了,它基于 Erlang 语言开发实现。
RabbitMQ 支持 AMQP、XMPP、SMTP、STOMP 等多种协议,功能强大,适用于企业级开发。
RocketMQ
RocketMQ 是阿里开源的一款分布式消息中间件,原名 Metaq,从 3.0 版本开始改名为 RocketMQ,是阿里参照 Kafka 设计思想使用 Java 语言实现的一套 MQ。RocketMQ 将阿里内部多款 MQ 产品(Notify、Metaq)进行整合,只维护核心功能,去除了所有其他运行时依赖,保证核心功能最简化,在此基础上配合阿里上述其他开源产品实现不同场景下 MQ 的架构,目前主要用于订单交易系统。
RocketMQ 具有以下特点:
对于 Java 工程师而言,这也是一种经常会用到的 MQ。
Kafka
Kafka 是 Apache 下的一个开源流处理平台,由 Scala 和 Java 编写。Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作(网页浏览,搜索和其他用户的行动)流数据。Kafka 的目的是通过 Hadoop 的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
Kafka 具有以下特性:
大数据开发中大家可能会经常接触 Kafka,Java 开发中也会接触,但是相对来说可能接触的少一些。
安装之前最好先查看Erlang和RabbitMQ版本有对应关系https://www.rabbitmq.com/which-erlang.html
1、因为RabbitMQ是用Erlang语言开发的,因此首先要安装Erlang环境
安装wget下载工具:
yum -y install wget
下载erlang安装包 wget下载默认下载在当前目录:
wget https://packages.erlang-solutions.com/erlang-solutions-2.0-1.noarch.rpm
解压升级erlang包:
rpm -Uvh erlang-solutions-2.0-1.noarch.rpm
安装erlang包:
yum install -y erlang
查看erlang是否安装成功,以及版本信息:
erl -v
2、安装rabbitmq
安装socat(rabbitmq安装是需要依赖该插件):
yum install -y socat
rabbitmq-server文件包的下载:
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.16/rabbitmq-server-3.8.16-1.el8.noarch.rpm
安装rabbitmq:
yum install -y rabbitmq-server-3.8.16-1.el8.noarch.rpm
rabbitmq常用命令:
systemctl start rabbitmq-server
:启动rabbitmq服务systemctl status rabbitmq-server
:查看rabbitmq运行状态systemctl enable rabbitmq-server
:设置rabbitmq服务开机自启动systemctl stop rabbitmq-server
:停止服务systemctl restart rabbitmq-server
:重启服务
3、RabbitMQWeb管理界面授权操作
默认情况下,rabbitmq是没有安装web端的客户端插件,需要安装才可以生效:
rabbitmq-plugins enable rabbitmq_management
安装完毕后,重启服务即可:
systemctl restart rabbitmq-server
访问服务器ip+端口号(默认15672):
若访问不成功,检查是不是服务器的防火墙未关闭!
说明:rabbitmq有一个默认账号和密码: guest
,默认情况下只能在localhost本机下访问,所以需要添加一个远程登录的用户
你会发现这个guest用户只能在本机登录,因为这里我的rabbit服务部署在虚拟机上,而我访问是用我的window访问的,因此就会登录失败。这是我们需要授权账号添加新用户
4、为RabbitMQ新增用户
新增用户:
rabbitmqctl add_user admin admin
设置用户分配操作权限:
rabbitmqctl set_user_tags admin administrator
用户级别:
为用户添加资源权限:
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
登录:
rabbitmqctl常用命令:
rabbitmqctl add_user 账号 密码
:添加账户rabbitmqctl set_user_tags 账号 administrator
:为用户设置级别rabbitmqctl change_password 用户名 新密码
:修改用户密码rabbitmqctl delete_user 用户名
:删除某个用户rabbitmqctl list_users
:显示用户列表rabbitmqctl set_permissions -p / 用户名 ".*" ".*" ".*"
:为用户设置administrator角色
这张图中涉及到如下一些概念:
官网上有详细介绍这几种模式:https://www.rabbitmq.com/getstarted.html
为了学习这几种模式,我们先来创建一个SpringBoot项目来操作一下RabbitMQ
主要添加两个依赖,Spring Web
和 Spring for RabbitMQ
配置一下RabbitMQ的相关信息:
spring:
# 配置RabbitMQ的相关信息
rabbitmq:
host: 192.168.139.131
# 与rabbitmq通信的端口号 切记这里不是15672,15672是web界面的端口
port: 5672
username: admin
password: admin
# 这里可以不用给,默认就是 /
virtual-host: /
这个咋没有交换机?其实这里使用的是默认的交换机,我们只需要提供一个生产者、一个队列、一个消费者即可。
添加队列:
@Configuration
public class RabbitMqConfig {
public static final String HELLO_WORD_QUEUE = "hello_world_queue";
/**
* 向工厂中注入一个 queue(队列)
* @return
*/
@Bean
Queue queue1() {
// 参数1-name:队列的名称,不可为空
// 参数2-durable:队列是否可持久化,默认为true。
// true:持久化队列,队列的声明会存放到Erlang自带的Mnesia数据库中,所以该队列在服务器重新启动后继续存在。
// false:非持久化队列,队列的声明会存放到内存中,当服务器关闭时内存会被清除,所以该队列在服务器重新启动后不存在。
// 参数3-exclusive:队列是否具有排它性,默认为false。
// true:1) 当连接关闭时connection.close()该队列会自动删除; 2)会对该队列加锁,其他通道channel不能访问该队列。如果强制访问会报异常下述异常,即一个队列只能有一个消费者消费。
// false: 1)当连接关闭时connection.close()该队列不会自动删除;2) 不会对该队列加锁,其他通道channel能访问该队列,即一个队列可以有多个消费者消费。
// 参数4-autoDelete:队列没有任何订阅的消费者时是否自动删除,默认为false。可以通过RabbitMQ Management,查看某个队列的消费者数量。如果为true,当consumers = 0时队列就会自动删除。
return new Queue(HELLO_WORD_QUEUE, true, false, false);
}
}
消息接收者:
@Component
public class MsgReceiver {
/**
* 通过@RabbitListener,指定该方法监听的消息队列,该注解的参数就是监听队列的名字
* @param msg
*/
@RabbitListener(queues = RabbitMqConfig.HELLO_WORD_QUEUE) // 设置要监听的队列
public void handleMsg(String msg) {
System.out.println("接收的消息为:"+msg);
}
}
消息发送者:
为了方便,我们直接在测试方法里发送消息
@SpringBootTest
class RabbitmqSpringbootApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息
*/
@Test
void sendMsg() {
// 第一个参数是队列名称 第二个参数是发送的消息
rabbitTemplate.convertAndSend(RabbitMqConfig.HELLO_WORD_QUEUE, "hello world!");
System.out.println("成功发送消息!");
}
}
启动项目,并且执行测试方法之后,我们看一下结果
发送端:
接受端:
web界面:
因为一个queue
中的消息只能被消费一次,因此这里一个message
只能被一个consumer
消费
一个生产者,一个默认的交换机(DirectExchange)。一个队列对应了多个消费者,默认情况下,由队列对消息进行平均分配,消息会被分到不同的消费者手中。消费者可以配置各自的并发能力,进而提高消息的消费能力,也可以配置手动 ack,来决定是否要消费某一条消息。
其实该模式和上面那个模式没有太大的不同,不一样的就是接收消息的消费者现在有多个
那我们在上一个模式的基础上稍微改动一下代码
这里我们再增加一个消费者
@Component
public class MsgReceiver {
/**
* 通过@RabbitListener,指定该方法监听的消息队列,该注解的参数就是监听队列的名字
* @param msg
*/
@RabbitListener(queues = RabbitMqConfig.HELLO_WORD_QUEUE) // 设置要监听的队列
public void handleMsg1(String msg) {
System.out.println("msg1 - 接收的消息为:"+msg);
}
/**
* 通过@RabbitListener,指定该方法监听的消息队列,该注解的参数就是监听队列的名字
* @param msg
*/
@RabbitListener(queues = RabbitMqConfig.HELLO_WORD_QUEUE) // 设置要监听的队列
public void handleMsg2(String msg) {
System.out.println("msg2 - 接收的消息为:"+msg);
}
}
一次发送十个消息
/**
* 发送消息
*/
@Test
void sendMsg() {
for (int i = 0; i < 10; i++) {
// 第一个参数是队列名称 第二个参数是发送的消息
rabbitTemplate.convertAndSend(RabbitMqConfig.HELLO_WORD_QUEUE, "小虎牙" + " :" + i);
}
System.out.println("成功发送消息!");
}
让我们来看一下启动之后的结果:(仔细看,其实两个消费者是轮询消费数据的)
其他知识点
消费者可以配置各自的并发能力,进而提高消息的消费能力。也可以配置手动 ack,来决定是否要消费某一条消息。
1、先来看并发能力的配置,如下:
@Component
public class MsgReceiver {
/**
* 通过@RabbitListener,指定该方法监听的消息队列,该注解的参数就是监听队列的名字
* @param msg
*/
@RabbitListener(queues = RabbitMqConfig.HELLO_WORD_QUEUE) // 设置要监听的队列
public void handleMsg1(String msg) {
System.out.println("msg1 - 接收的消息为:"+msg);
}
/**
* concurrency: 指的是并发数量,这个消费者将开启10个子线程区消费消息
* @param msg
*/
@RabbitListener(queues = RabbitMqConfig.HELLO_WORD_QUEUE, concurrency = "10") // 设置要监听的队列
public void handleMsg2(String msg) {
System.out.println("msg2 - 接收的消息为:"+Thread.currentThread().getName()+":"+msg);
}
}
可以看到,第二个消费者我配置了 concurrency 为 10,此时,对于第二个消费者,将会同时存在 10 个子线程去消费消息。
启动项目,在 RabbitMQ 后台也可以看到一共有 11 个消费者。
我们再次发送10个消息测试一下
2、当然消息消费者也可以开启手动 ack,这样可以自行决定是否消费 RabbitMQ 发来的消息,配置手动 ack 的方式如下:
消费者代码:
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.io.IOException;
// 注意一下Channel 和 Message的包,别导错了
@Component
public class MsgReceiver {
@RabbitListener(queues = RabbitMqConfig.HELLO_WORD_QUEUE)
public void handleMsg1(Message message, Channel channel) throws IOException {
System.out.println("接收到的消息内容:"+message.getPayload());
// basicAck:确认消息, 就是告诉 RabbitMQ,这条消息我以及消费成功了
channel.basicAck((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG), false); // basicAck方法的第二个参数来设置是否批量处理 ,false :不批量
}
@RabbitListener(queues = RabbitMqConfig.HELLO_WORD_QUEUE, concurrency = "10")
public void handleMsg2(Message message, Channel channel) throws IOException {
// basicReject: 拒绝消息
channel.basicReject((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG), true); // basicReject的第二个参数 来设置被拒绝的消息是否重新进入队列中
}
}
重新发送一下消息:
一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。需要注意的是,如果将消息发送到一个没有队列绑定的 Exchange上面,那么该消息将会丢失,这是因为在 RabbitMQ 中 Exchange 不具备存储消息的能力,只有队列具备存储消息的能力,如下图:
因为exchange
可以把message
拷贝到不同的queue
中,因此这里一个message
可以被多个consumer
消费
配置类:
@Configuration
public class FanoutConfig {
public static final String FANOUT_QUEUE1 = "fanout_queue1";
public static final String FANOUT_QUEUE2 = "fanout_queue2";
public static final String FANOUT_EXCHANGE = "fanout_exchange";
@Bean
Queue fanoutQueue1() {
return new Queue(FANOUT_QUEUE1, true, false, false);
}
@Bean
Queue fanoutQueue2() {
return new Queue(FANOUT_QUEUE2, true, false, false);
}
@Bean
FanoutExchange fanoutExchange() {
// 参数1: 交换机名称
// 参数2: 交换机是否可持久化
// 参数3: 如果没有与之绑定的队列,是否删除该交换机
return new FanoutExchange(FANOUT_EXCHANGE, true, false);
}
@Bean
Binding fanoutBinding1() {
return BindingBuilder
.bind(fanoutQueue1()) // 要绑定的队列
.to(fanoutExchange()); // 要绑定的交换机
}
@Bean
Binding fanoutBinding2() {
return BindingBuilder
.bind(fanoutQueue2()) // 要绑定的队列
.to(fanoutExchange()); // 要绑定的交换机
}
}
添加消费者:
@Component
public class FanoutReceiver {
@RabbitListener(queues = FanoutConfig.FANOUT_QUEUE1)
public void msgHandler1(String msg) {
System.out.println("FanoutReceiver1"+" : "+msg);
}
@RabbitListener(queues = FanoutConfig.FANOUT_QUEUE2)
public void msgHandler2(String msg) {
System.out.println("FanoutReceiver2"+" : "+msg);
}
}
发送消息:
@SpringBootTest
class RabbitmqSpringbootApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息
*/
@Test
void sendMsg() {
// 参数1:交换机名称 参数2:路由key(发布订阅模式下不需要) 参数3:消息
rabbitTemplate.convertAndSend(FanoutConfig.FANOUT_EXCHANGE, null, "hello fanout!");
}
}
测试结果:
DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上,例如消息队列名为 “hello-queue”,则 routingkey 为 “hello-queue” 的消息会被该消息队列接收
配置类:
/**
* Direct: 这种路由策略,将消息队列绑定到 DirectExchange上, 当消息到达交换机的时候,消息会携带一个 routing_key, 然后交换机会找到名为 routing_key的队列,将消息路由过去
*/
@Configuration
public class DirectConfig {
public static final String DIRECT_QUEUE1 = "direct_queue1";
public static final String DIRECT_QUEUE2 = "direct_queue2";
public static final String DIRECT_EXCHANGE = "direct_exchange";
@Bean
Queue directQueue1() {
return new Queue(DIRECT_QUEUE1, true, false, false);
}
@Bean
Queue directQueue2() {
return new Queue(DIRECT_QUEUE2, true, false, false);
}
/**
* 直连交换机(routing)
* @return
*/
@Bean
DirectExchange directExchange() {
// 参数1: 交换机名称
// 参数2: 交换机是否可持久化
// 参数3: 如果没有与之绑定的队列,是否删除该交换机
return new DirectExchange(DIRECT_EXCHANGE, true, false);
}
@Bean
Binding directBinding1() {
return BindingBuilder
.bind(directQueue1()) // 要绑定的队列
.to(directExchange()) // 要绑定的交换机
.with("aa"); // 设置routing key
}
@Bean
Binding directBinding2() {
return BindingBuilder
.bind(directQueue2()) // 要绑定的队列
.to(directExchange()) // 要绑定的交换机
.with("bb"); // 设置routing key
}
}
添加消费者:
@Component
public class DirectReceiver {
@RabbitListener(queues = DirectConfig.DIRECT_QUEUE1)
public void msgHandler1(String msg) {
System.out.println("DirectReceiver1"+" : "+msg);
}
@RabbitListener(queues = DirectConfig.DIRECT_QUEUE2)
public void msgHandler2(String msg) {
System.out.println("DirectReceiver2"+" : "+msg);
}
}
发送消息:
@SpringBootTest
class RabbitmqSpringbootApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息
*/
@Test
void sendMsg() {
// 参数1:交换机名称 参数2:路由key 参数3:消息
rabbitTemplate.convertAndSend(DirectConfig.DIRECT_EXCHANGE, "aa", "发送给绑定aa路由的队列");
// 参数1:交换机名称 参数2:路由key 参数3:消息
rabbitTemplate.convertAndSend(DirectConfig.DIRECT_EXCHANGE, "bb", "发送给绑定bb路由的队列");
}
}
演示结果:
TopicExchange 是比较复杂但是也比较灵活的一种路由策略,在 TopicExchange 中,Queue 通过 routingkey 绑定到 TopicExchange 上,当消息到达 TopicExchange 后,TopicExchange 根据消息的 routingkey 将消息路由到一个或者多个 Queue 上。(改进版的路由模式,比较常用)
配置类:
@Configuration
public class TopicConfig {
public static final String HUAWEI_QUEUE = "huawei_queue1";
public static final String XIAOMI_QUEUE = "xiaomi_queue2";
public static final String PHONE_QUEUE = "phone_queue3";
public static final String TOPIC_EXCHANGE = "topic_exchange";
@Bean
Queue huaweiQueue() {
return new Queue(HUAWEI_QUEUE, true, false, false);
}
@Bean
Queue xiaomiQueue() {
return new Queue(XIAOMI_QUEUE, true, false, false);
}
@Bean
Queue phoneQueue() {
return new Queue(PHONE_QUEUE, true, false, false);
}
/**
* 主题交换机(topic)
* @return
*/
@Bean
TopicExchange topicExchange() {
// 参数1: 交换机名称
// 参数2: 交换机是否可持久化
// 参数3: 如果没有与之绑定的队列,是否删除该交换机
return new TopicExchange(TOPIC_EXCHANGE, true, false);
}
@Bean
Binding xiaomiBingding() {
return BindingBuilder
.bind(xiaomiQueue()) // 绑定的队列
.to(topicExchange()) // 绑定的交换机
.with("xiaomi.#"); // 绑定的 路由key ,这里 # 是一个通配符,表示将来的消息的routing_key 只要是以xiaomi开头,都会被路由到 xiaomiQueue
}
@Bean
Binding huaweiBingding() {
return BindingBuilder
.bind(huaweiQueue()) // 绑定的队列
.to(topicExchange()) // 绑定的交换机
.with("huawei.#"); // 绑定的 路由key ,这里 # 是一个通配符,表示将来的消息的routing_key 只要是以huawei开头,都会被路由到 huaweiQueue
}
@Bean
Binding phoneBingding() {
return BindingBuilder
.bind(phoneQueue()) // 绑定的队列
.to(topicExchange()) // 绑定的交换机
.with("#.phone.#"); // 绑定的 路由key ,这里 # 是一个通配符,表示将来的消息的routing_key 只要包含 phone,都会被路由到 phoneQueue
}
}
添加消费者:
@Component
public class TopicReceiver {
@RabbitListener(queues = TopicConfig.HUAWEI_QUEUE)
public void huawei(String msg) {
System.out.println("huawei"+" : "+msg);
}
@RabbitListener(queues = TopicConfig.XIAOMI_QUEUE)
public void xiaomi(String msg) {
System.out.println("xiaomi"+" : "+msg);
}
@RabbitListener(queues = TopicConfig.PHONE_QUEUE)
public void phone(String msg) {
System.out.println("phone"+" : "+msg);
}
}
发送消息:
@SpringBootTest
class RabbitmqSpringbootApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息
*/
@Test
void sendMsg() throws InterruptedException {
// 参数1:交换机名称 参数2:路由key 参数3:消息
rabbitTemplate.convertAndSend(TopicConfig.TOPIC_EXCHANGE, "xiaomi.new", "小米新闻");
// 参数1:交换机名称 参数2:路由key 参数3:消息
rabbitTemplate.convertAndSend(TopicConfig.TOPIC_EXCHANGE, "huawei.new", "华为新闻");
// 参数1:交换机名称 参数2:路由key 参数3:消息
rabbitTemplate.convertAndSend(TopicConfig.TOPIC_EXCHANGE, "huawei.phone.new", "华为手机新闻");
}
}
演示结果:
HeadersExchange 是一种使用较少的路由策略,HeadersExchange 会根据消息的 Header 将消息路由到不同的 Queue 上,这种策略也和 routingkey无关
配置类:
@Configuration
public class HeaderConfig {
public static final String HEADER_QUEUE_NAME = "header_queue_name";
public static final String HEADER_QUEUE_AGE = "header_queue_age";
public static final String HEADER_EXCHANGE_NAME = "header_exchange_name";
@Bean
Queue headerNameQueue() {
return new Queue(HEADER_QUEUE_NAME, true, false, false);
}
@Bean
Queue headerAgeQueue() {
return new Queue(HEADER_QUEUE_AGE, true, false, false);
}
@Bean
HeadersExchange headersExchange() {
return new HeadersExchange(HEADER_EXCHANGE_NAME, true, false);
}
@Bean
Binding nameBinding() {
return BindingBuilder
.bind(headerNameQueue())
.to(headersExchange())
.where("name").exists(); // 如果将来消息头部包含 name 属性,就算匹配成功
}
@Bean
Binding ageBinding() {
return BindingBuilder
.bind(headerNameQueue())
.to(headersExchange())
.where("age").matches(99); // 如果将来消息头部必须要有age,值必须是99 才能匹配成功
}
}
添加消费者:
@Component
public class HeaderReceiver {
@RabbitListener(queues = HeaderConfig.HEADER_QUEUE_NAME)
public void nameReceiver(byte[] msg) {
System.out.println("nameMsgHandler >>> "+ new String(msg, 0, msg.length));
}
@RabbitListener(queues = HeaderConfig.HEADER_QUEUE_AGE)
public void ageReceiver(byte[] msg) {
System.out.println("ageMsgHandler >>> "+ new String(msg, 0, msg.length));
}
}
发送消息:
@SpringBootTest
class RabbitmqSpringbootApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息
*/
@Test
void sendMsg() throws InterruptedException {
Message nameMsg = MessageBuilder.withBody("hello zhangsan".getBytes()).setHeader("name", "aaa").build();
rabbitTemplate.send(HeaderConfig.HEADER_EXCHANGE_NAME, null, nameMsg);
Message ageMsg = MessageBuilder.withBody("hello lisi".getBytes()).setHeader("age", 99).build();
rabbitTemplate.send(HeaderConfig.HEADER_EXCHANGE_NAME, null, ageMsg);
}
}
演示结果:
简单模式
和 工作队列模式(work queue)
我们都不需要设置交换机(使用的默认交换机)queue
的message
只能被一个consumer
消费,但是通过一个exchange
的message
可以被分发到多个queue
中,然后被多个consumer
消费。FanoutExchange
DirectExchange
TopicExchange
HeadersExchange
RabbitMQ 中的消息长期未被消费会过期吗?
默认情况下,消息是不会过期的,也就是我们平日里在消息发送时,如果不设置任何消息过期的相关参数,那么消息是不会过期的,即使消息没被消费掉,也会一直存储在队列中。
TTL
(Time-To-Live),消息存活的时间,即消息的有效期。如果我们希望消息能够有一个存活时间,那么我们可以通过设置 TTL
来实现这一需求。如果消息的存活时间超过了 TTL
并且还没有被消息,此时消息就会变成死信
TTL 的设置有两种不同的方式:
那如果两个都设置了呢?
答:以时间短的为准。
当我们设置了消息有效期后,消息过期了就会被从队列中删除了(进入到死信队列,后文一样,不再标注),但是两种方式对应的删除时机有一些差异:
介绍完 TTL 之后,接下来我们来看看具体用法。
单条消息过期
配置类:
这里以direct模式为例
@Configuration
public class Msg_ttlConfig {
public static final String MSG_TTL_QUEUE = "msg_ttl_queue";
public static final String MSG_TTL_EXCHANGE = "msg_ttl_exchange";
// 注入队列
@Bean
Queue msgTtlQueue() {
return new Queue(MSG_TTL_QUEUE, true, false, false);
}
// 注入交换机
@Bean
DirectExchange msgTtlExchange() {
return new DirectExchange(MSG_TTL_EXCHANGE, true, false);
}
// 绑定关系
@Bean
Binding msgTtlQueueBingding() {
return BindingBuilder
.bind(msgTtlQueue())
.to(msgTtlExchange())
.with(MSG_TTL_QUEUE);
}
}
通过接口方法发送消息:
@RestController
public class HelloController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public void hello() {
// 构建消息对象
Message message = MessageBuilder
.withBody("hello TTL".getBytes()) // 消息内容
.setExpiration("10000") // 设置消息过期时间10秒
.build();
// arg1:交换机名 arg2:路由key arg3:消息对象
rabbitTemplate.send(Msg_ttlConfig.MSG_TTL_EXCHANGE, Msg_ttlConfig.MSG_TTL_QUEUE, message);
}
}
项目启动后,我们访问该接口:
web界面:
10秒之后:
队列消息过期
为队列设置过期时间,配置类:
@Configuration
public class Queue_ttlConfig {
public static final String QUEUE_TTL_QUEUE = "queue_ttl_queue";
public static final String QUEUE_TTL_EXCHANGE = "queue_ttl_exchange";
// 注入队列
@Bean
Queue queueTtlQueue() {
HashMap<String, Object> args = new HashMap<>();
// 给消息队列设置过期时间,该队列的消息如果10s没人消费,则过期
args.put("x-message-ttl", 10000);
return new Queue(QUEUE_TTL_QUEUE, true, false, false, args);
}
// 注入交换机
@Bean
DirectExchange queueTtlExchange() {
return new DirectExchange(QUEUE_TTL_EXCHANGE, true, false);
}
// 绑定关系
@Bean
Binding BindingqueueTtl() {
return BindingBuilder
.bind(queueTtlQueue())
.to(queueTtlExchange())
.with(QUEUE_TTL_QUEUE);
}
}
主要代码:
// 注入队列
@Bean
Queue queueTtlQueue() {
HashMap<String, Object> args = new HashMap<>();
// 给消息队列设置过期时间,该队列的消息如果10s没人消费,则过期
args.put("x-message-ttl", 10000);
return new Queue(QUEUE_TTL_QUEUE, true, false, false, args);
}
后面就不做演示了效果,队列中的消息到10秒之后会过期。
被删除的消息去哪了?真的被删除了吗?并非如此,这就涉及到死信队列了,接下来我们来看看死信队列。
死信交换机:
死信交换机,Dead-Letter-Exchange
即 DLX
。死信交换机用来接收死信消息(Dead Message
)的。
那什么是死信消息呢?一般消息变成死信消息有如下几种情况:
当消息在一个队列中变成了死信消息后,此时就会被发送到 DLX
,绑定 DLX
的消息队列则称为死信队列
。DLX
本质上也是一个普普通通的交换机,我们可以为任意队列指定 DLX,当该队列中存在死信时,RabbitMQ 就会自动的将这个死信发布到 DLX 上去,进而被路由到另一个绑定了 DLX 的队列上(即死信队列)。
死信队列:
这个好理解,绑定了死信交换机的队列就是死信队列。
实践
配置死信交换机以及队列:
其实就是普通的交换机 和 普通的队列、绑定
@Configuration
public class DLXConfig {
public static final String DLX_EXCHANGE = "dlx_exchange_name";
public static final String DLX_QUEUE = "dlx_queue_name";
@Bean
DirectExchange dlxDirectExchange() {
return new DirectExchange(DLX_EXCHANGE, true, false);
}
@Bean
Queue dlxQueue() {
return new Queue(DLX_QUEUE, true, false, false);
}
@Bean
Binding dlxBinding() {
return BindingBuilder
.bind(dlxQueue())
.to(dlxDirectExchange())
.with(DLX_QUEUE);
}
}
为正常队列绑定 死信交换机 以及 死信路由key:
@Configuration
public class DLXNormalConfig {
public static final String MSG_EXCHANGE = "msg_exchange_name";
public static final String MSG_QUEUE = "msg_queue_name";
@Bean
DirectExchange MSGDirectExchange() {
return new DirectExchange(MSG_EXCHANGE, true, false);
}
@Bean
Queue MSGQueue() {
Map<String, Object> args = new HashMap<>();
//设置消息过期时间
args.put("x-message-ttl", 0);
//设置死信交换机
args.put("x-dead-letter-exchange", DLXConfig.DLX_EXCHANGE);
//设置死信 routing_key
args.put("x-dead-letter-routing-key", DLXConfig.DLX_QUEUE);
return new Queue(MSG_QUEUE, true, false, false, args);
}
@Bean
Binding MSGBinding() {
return BindingBuilder
.bind(MSGQueue())
.to(MSGDirectExchange())
.with(MSG_QUEUE);
}
}
就两个参数:
x-dead-letter-exchange
:配置死信交换机x-dead-letter-routing-key
:配置死信 routing_key这样就为正常队列(msg_queue_name) 配置好了 对应的死信交换机(dlx_exchange_name) 以及 死信路由key(dlx_queue_name)。
将来发送到这个正常队列(msg_queue_name)的消息,如果发生了 nack、reject 或者过期等问题,就会被发送到 DLX 上,进而进入到与 DLX 绑定的消息队列上。
死信消息队列的消费和普通消息队列的消费并无二致:
我们来监听那个私信队列:
@Component
public class DLXReceiver {
@RabbitListener(queues = DLXConfig.DLX_QUEUE)
public void handle(String msg) {
System.out.println("死信消息:"+msg);
}
}
发送消息:
@GetMapping("/send")
public void hello() {
// 向正常队列发送消息
rabbitTemplate.convertAndSend(DLXNormalConfig.MSG_EXCHANGE, DLXNormalConfig.MSG_QUEUE, "过期的话,应该会到私信队列中");
}
演示结果:
延迟队列存储的对象肯定是对应的延时消息,所谓"延时消息"是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。
场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理。这是就可以使用延时队列将订单信息发送到延时队列。
场景二:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延时队列,当指令设定的时间到了再将指令推送到智能设备。
那么RabbitMQ如何实现迟队列呢? 目前有两种实现方案
在rabbitmq 3.5.7及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时插件依赖Erlang/OPT 18.0及以上。
下载安装:
首先我们需要下载 rabbitmq_delayed_message_exchange 插件,这是一个 GitHub 上的开源项目,我们直接下载即可:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
首先进入RabbitMQ的安装目录。上面我们是通过yum
命令来安装RabbitMQ的,它的默认安装路径是/usr/lib/rabbitmq/lib
进入plugins
目录
cd rabbitmq_server-3.8.16/plugins
下载插件:
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez
启用插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
查看目前rabbitmq启用的插件:
rabbitmq-plugins list
开始使用:
配置类:
@Configuration
public class DelayConfig {
public static final String DELAY_QUEUE_NAME = "delay_queue";
public static final String DELAY_EXCHANGE_NAME = "delay_exchange";
@Bean
Queue delayQueue() {
return new Queue(DELAY_QUEUE_NAME, true, false, false);
}
@Bean
CustomExchange customExchange() {
Map<String, Object> args = new HashMap<>();
// 指定交换机的类型
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
@Bean
Binding delayedBinding() {
return BindingBuilder
.bind(delayQueue())
.to(customExchange())
.with(DELAY_QUEUE_NAME)
.noargs();
}
}
这里我们使用的交换机是 CustomExchange
,这是一个 Spring 中提供的交换机,创建 CustomExchange
时有五个参数,含义分别如下:
x-delayed-message
最后一个 args 参数中,指定了交换机消息分发的类型,这个类型就是大家熟知的 direct、fanout、topic 以及 header 几种,用了哪种类型,将来交换机分发消息就按哪种方式来。
接下来我们再创建一个消息消费者:
@Component
public class DelayedReceiver {
private static final Logger log = LoggerFactory.getLogger(DelayedReceiver.class);
@RabbitListener(queues = DelayConfig.DELAY_QUEUE_NAME)
public void handleMsg(String msg) {
log.info("handleMsg ---- >> {}", msg);
}
}
发送消息:
@GetMapping("/send")
public void hello() {
Message message = MessageBuilder
.withBody(("hello 延迟队列" + new Date()).getBytes())
// 在消息头中设置延迟时间
.setHeader("x-delay", 3000)
.build();
rabbitTemplate.send(DelayConfig.DELAY_EXCHANGE_NAME, DelayConfig.DELAY_QUEUE_NAME, message);
}
演示结果:
DLX(死信交换机)+TTL(消息超时时间)
我们可以把死信队列就当成延迟队列。具体来说是这样:
假如一条消息需要延迟 30 分钟执行,我们就设置这条消息的有效期为 30 分钟,同时为这条消息配置死信交换机和死信 routing_key,并且不为这个消息队列设置消费者,那么 30 分钟后,这条消息由于没有被消费者消费而进入死信队列,此时我们有一个消费者就在“蹲点”这个死信队列,消息一进入死信队列,就立马被消费了。
配置类:
@Configuration
public class TTL_DLXConfig {
public static final String TEMP_EXCHANGE_NAME = "temp_exchange_name";
public static final String TEMP_QUEUE_NAME = "temp_queue_name";
public static final String DELAY_EXCHANGE_NAME = "delay_exchange_name";
public static final String DELAY_QUEUE_NAME = "delay_queue_name";
// 第一组:
/**
* 死信交换机
* @return
*/
@Bean
DirectExchange dlxExchange1() {
return new DirectExchange(DELAY_EXCHANGE_NAME, true, false);
}
/**
* 死信队列(正在意义上接收消息的队列)
* @return
*/
@Bean
Queue dlxQueue1() {
return new Queue(DELAY_QUEUE_NAME, true, false, false);
}
@Bean
Binding dlxBinding1() {
return BindingBuilder
.bind(dlxQueue1())
.to(dlxExchange1())
.with(DELAY_QUEUE_NAME);
}
// 第二组:
// 正常交换机
@Bean
DirectExchange tempExchage() {
return new DirectExchange(TEMP_EXCHANGE_NAME, true, false);
}
// 正常队列(非正在意义上接收消息的队列)
@Bean
Queue tempQueue() {
Map<String , Object> args = new HashMap<>();
//设置消息过期时间 10秒
args.put("x-message-ttl", 10000);
//设置死信交换机
args.put("x-dead-letter-exchange", DELAY_EXCHANGE_NAME);
//设置死信 routing_key
args.put("x-dead-letter-routing-key", DELAY_QUEUE_NAME);
return new Queue(TEMP_QUEUE_NAME, true, false, false, args);
}
@Bean
Binding msgBinding() {
return BindingBuilder
.bind(tempQueue())
.to(tempExchage())
.with(TEMP_QUEUE_NAME);
}
}
创建消费者:
@Component
public class DelayReceiver2 {
private static final Logger log = LoggerFactory.getLogger(DelayReceiver2.class);
@RabbitListener(queues = TTL_DLXConfig.DELAY_QUEUE_NAME)
public void handleMsg(String msg) {
log.info("handleMsg ---- >> {}", msg);
}
}
发送消息:
@GetMapping("/send")
public void hello() {
Message message = MessageBuilder
.withBody(("hello 延迟队列" + new Date()).getBytes())
.build();
rabbitTemplate.send(TTL_DLXConfig.TEMP_EXCHANGE_NAME, TTL_DLXConfig.TEMP_QUEUE_NAME, message);
}
演示结果:
大家知道,RabbitMQ 中的消息发送引入了 Exchange(交换机)的概念,消息的发送首先到达交换机上,然后再根据既定的路由规则,由交换机将消息路由到不同的 Queue(队列)中,再由不同的消费者去消费。
大致的流程就是这样,所以要确保消息发送的可靠性,主要从两方面去确认:
如何确保消息成功到达 RabbitMQ?RabbitMQ 给出了两种方案:
这两种不同的方案,不可以同时开启,只能选择其中之一,如果两者同时开启,则会报错
AMQP协议提供了事务机制,在投递消息时开启事务支持,如果消息投递失败,则回滚事务
Spring Boot 中开启 RabbitMQ 事务机制的方式如下:
首先需要先提供一个事务管理器,如下:
@Bean
RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
接下来,在消息生产者上面做两件事:添加事务注解并设置通信信道为事务模式:
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 开启事务模式
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
@Service
public class TransaMsgService {
@Autowired
RabbitTemplate rabbitTemplate;
@Transactional
public void send() {
rabbitTemplate.convertAndSend(TransactionalRabbitConfig.TRANSA_MSG_EXCHANGE_NAME, TransactionalRabbitConfig.TRANSA_MSG_QUEUE_NAME, "hello RabbitMQ Transational");
int i = 1 / 0;
}
}
这里注意两点:
@Transactional
注解标记事务。setChannelTransacted
方法设置为 true 开启事务模式。在上面的案例中,我们在结尾来了个 1/0 ,这在运行时必然抛出异常,我们可以尝试运行该方法,发现消息并未发送成功。
当我们开启事务模式之后,RabbitMQ 生产者发送消息会多出四个步骤:
上面的步骤,除了第三步是本来就有的,其他几个步骤都是平白无故多出来的。所以大家看到,事务模式其实效率有点低,这并非一个最佳解决方案。我们可以想想,什么项目会用到消息中间件?一般来说都是一些高并发的项目,这个时候并发性能尤为重要。
所以,RabbitMQ 还提供了发送方确认机制(publisher confirm)来确保消息发送成功,这种方式,性能要远远高于事务模式,一起来看下。
新增配置:
在配置类中,对rabbitTemplete进行处理:
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 为rabbitTemplete设置两个回调方法
*/
@PostConstruct // Constructor(构造方法) -> @Autowired(依赖注入) -> @PostConstruct(注释的方法)
public void initRabbitTemplete() {
// 用来确定消息到达交换的回调方法
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
if (ack) {
System.out.println("消息成功到达交换器");
}else{
// 业务处理 ...
System.out.println("消息发送失败");
}
}
});
// 在消息路由到队列失败时被调用
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
// 业务处理
System.out.println("消息未成功路由到队列");
}
});
}
实现 RabbitTemplate.ConfirmCallback
和 RabbitTemplate.ReturnsCallback
两个接口,这两个接口,前者的回调用来确定消息到达交换器,后者则会在消息路由到队列失败时被调用。如果发送失败,我们可以通过这两个回调方法来进行处理
这里我就不演示结果了,大家知道什么意思就可以了。
上面的确认机制,我们可以知道消息是否发送成功?那么接下来的步骤就是重试了。
重试又分两种情况 自带重试机制 和 业务重试
自带重试机制
在Spring Boot 中有相应的重试机制,但是这个重试机制就和 MQ 本身没有关系了,这是利用 Spring 中的 retry 机制来完成的,具体配置如下:
# 开启重试机制
spring.rabbitmq.template.retry.enabled=true
# 重试起始间隔时间
spring.rabbitmq.template.retry.initial-interval=1000ms
# 最大重试次数
spring.rabbitmq.template.retry.max-attempts=10
# 最大重试间隔时间
spring.rabbitmq.template.retry.max-interval=10000ms
# 间隔时间乘数。(这里配置间隔时间乘数为 2,则第一次间隔时间 1 秒,第二次重试间隔时间 2 秒,第三次 4 秒,以此类推)
spring.rabbitmq.template.retry.multiplier=2
业务重试
业务重试其实就是根据上面所说的那两个回调方法来进行发送消息失败后的处理,属于手工处理。
RabbitMQ 的消息消费,整体上来说有两种不同的思路:
这种方式大家比较常见,就是通过 @RabbitListener 注解去标记消费者,如下:
@Component
public class ConsumerDemo {
@RabbitListener(queues = RabbitConfig.QUEUE_NAME)
public void handle(String msg) {
System.out.println("msg = " + msg);
}
}
当监听的队列中有消息时,就会触发该方法。
拉(pull):
@Test
public void test01() throws UnsupportedEncodingException {
Object o = rabbitTemplate.receiveAndConvert(RabbitConfig.JAVABOY_QUEUE_NAME);
System.out.println("o = " + new String(((byte[]) o),"UTF-8"));
}
调用 receiveAndConvert
方法,方法参数为队列名称,方法执行完成后,会从 MQ 上拉取一条消息下来,如果该方法返回值为 null,表示该队列上没有消息了。receiveAndConvert
方法有一个重载方法,可以在重载方法中传入一个等待超时时间,例如 3 秒。此时,假设队列中没有消息了,则 receiveAndConvert
方法会阻塞 3 秒,3 秒内如果队列中有了新消息就返回,3 秒后如果队列中还是没有新消息,就返回 null,这个等待超时时间要是不设置的话,默认为 0。
总结
这是消息两种不同的消费模式。
如果需要从消息队列中持续获得消息,就可以使用推模式;如果只是单纯的消费一条消息,则使用拉模式即可。切忌将拉模式放到一个死循环中,变相的订阅消息,这会严重影响 RabbitMQ 的性能。
推模式自动确认
consumer自动应答,处理成功(注意:此处的成功确认是没有发生异常)发出ack,处理失败发出nack。rabbitmq发出消息后会等待consumer端应答,只有收到ack确定信息后才会将消息在rabbitmq清除掉。收到nack异常信息的处理方法由setDefaultRequeueReject()方法设置,这种模式下,发送错误的消息可以恢复。
在 Spring Boot 中,默认情况下,消息消费就是自动确认的
@Component
public class ConsumerDemo {
@RabbitListener(queues = RabbitConfig.QUEUE_NAME)
public void handle2(String msg) {
System.out.println("msg = " + msg);
int i = 1 / 0;
}
}
通过 @Componet 注解将当前类注入到 Spring 容器中,然后通过 @RabbitListener 注解来标记一个消息消费方法,默认情况下,消息消费方法自带事务,即如果该方法在执行过程中抛出异常,那么被消费的消息会重新回到队列中等待下一次被消费,如果该方法正常执行完没有抛出异常,则这条消息就算是被消费了。
推模式手动确认
要开启手动确认,需要我们首先关闭自动确认,关闭方式如下:
spring.rabbitmq.listener.simple.acknowledge-mode=manual
接下来我们来看下消费者中的代码:
@RabbitListener(queues = RabbitConfig.QUEUE_NAME)
public void handle(Message message,Channel channel) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//消息消费的代码写到这里
String s = new String(message.getBody());
System.out.println("s = " + s);
//消费完成后,手动 ack
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
//手动 nack
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
将消费者要做的事情放到一个 try..catch
代码块中。
如果消息正常消费成功,则执行 basicAck
完成确认。
如果消息消费失败,则执行 basicNack
方法,告诉 RabbitMQ 消息消费失败。
这里涉及到两个方法:
id
;第二个参数 multiple
如果为 false
,表示仅确认当前消息消费成功;如果为 true
,则表示当前消息之前所有未被当前消费者确认的消息都消费成id
;第二个参数multiple
如果为 false
,表示仅拒绝当前消息的消费,如果为true
,则表示拒绝当前消息之前所有未被当前消费者确认的消息;第三个参数 requeue
含义和前面所说的一样,被拒绝的消息是否重新入队。当 basicNack 中最后一个参数设置为 false 的时候,那么被拒绝的消息将进入死信队列中
拉模式手动确认
拉模式手动 ack 比较麻烦一些,在 Spring 中封装的 RabbitTemplate 中并未找到对应的方法,所以我们得用原生的办法,如下:
public void receive2() {
Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(false);
long deliveryTag = 0L;
try {
GetResponse getResponse = channel.basicGet(RabbitConfig.JAVABOY_QUEUE_NAME, false);
deliveryTag = getResponse.getEnvelope().getDeliveryTag();
System.out.println("o = " + new String((getResponse.getBody()), "UTF-8"));
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
这里涉及到的 basicAck 和 basicNack 方法跟前面的一样,我就不再赘述。
当客户端收到消息时,可以选择消费这条消息,也可以选择拒绝这条消息。我们来看下拒绝的方式:
@Component
public class ConsumerDemo {
@RabbitListener(queues = RabbitConfig.QUEUE_NAME)
public void handle(Channel channel, Message message) {
//获取消息编号
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//拒绝消息
channel.basicReject(deliveryTag, true);
} catch (IOException e) {
e.printStackTrace();
}
}
}
消费者收到消息之后,可以选择拒绝消费该条消息,拒绝的步骤分两步:
调用 basicReject 方法时,第二个参数是 requeue,即是否重新入队。如果第二个参数为 true,则这条被拒绝的消息会重新进入到消息队列中,等待下一次被消费;如果第二个参数为 false,则这条被拒绝的消息就会被丢掉,不会有新的消费者去消费它了。
与basicNack方法不同在于:
大家设想下面一个场景:
消费者在消费完一条消息后,向 RabbitMQ 发送一个 ack 确认,此时由于网络断开或者其他原因导致 RabbitMQ 并没有收到这个 ack,那么此时 RabbitMQ 并不会将该条消息删除,当重新建立起连接后,消费者还是会再次收到该条消息,这就造成了消息的重复消费。同时,由于类似的原因,消息在发送的时候,同一条消息也可能会发送两次。种种原因导致我们在消费消息时,一定要处理好幂等性问题。
幂等性问题的处理的思路大概如下:
采用 Redis,在消费者消费消息之前,现将消息的 id 放到 Redis 中,存储方式如下:
如果 ack 失败,在 RabbitMQ 将消息交给其他的消费者时,先执行 setnx,如果 key 已经存在(说明之前有人消费过该消息),获取他的值,如果是 0,当前消费者就什么都不做,如果是 1,直接 ack。
极端情况:第一个消费者在执行业务时,出现了死锁,在 setnx 的基础上,再给 key 设置一个生存时间。生产者,发送消息时,指定 messageId。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/qq_45464560/article/details/123895491
内容来源于网络,如有侵权,请联系作者删除!