提示:RaabitMQ消息队列的学习。
RabbitMQ 是一个消息中间件:
它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包AMQP
,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用1. 我们把erlang环境与rabbitMQ 安装包解压到Linux
2. rpm -ivh erlang安装包
3. yum install socat -y 安装依赖 / rpm -ivh socat依赖包 --force --nodeps
4. rpm -ivh rabbitmq安装包
1. 开启服务 /sbin/service rabbitmq-server start / service rabbitmq-server start
2. 停止服务 service rabbitmq-server stop
3. 重启服务 service rabbitmq-server restart
1. rabbitmq-plugins enable rabbitmq_management
1. 创建rabbitMQ账号
rabbitmqctl add_user 用户名 密码
2. 设置用户角色
rabbitmqctl set_user_tags 用户名 administrator #设置用户名为超级管理员
3. 设置用户权限
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
4. 查看rabbitmq的用户和角色
rabbitmqctl list_users
5. 登录rabbitMQ 界面: Linux虚拟机ip:15672 即可
记得开放15672端口
访问 Linux虚拟机ip:15672 即可
输入账户密码后看到这个界面代表成功
Docker安装
1. docker pull rabbitmq:3-management
2. 开启rabbitMQ
docker run \
-e RABBITMQ_DEFAULT_USER=root \
-e RABBITMQ_DEFAULT_PASS=123456 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
<dependencies>
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
</dependencies>
/** * 生产者:发消息 */
public class Producer {
//队列名称
public static final String QUEUE_NAME="hello";
//发消息
public static void main(String[] args) throws Exception{
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
//工厂IP连接rabbitMQ队列
factory.setHost("ip地址");
//设置用户名密码
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
//创建连接
Connection connection = factory.newConnection();
//通过连接来获取 信道来发消息
Channel channel = connection.createChannel();
/** * 生成一个队列 * 1.队列名称 * 2.队列里面的信息是否持久化 默认false 信息存储在内存中 * 3.该列队是否只供一个消费者进行消费,是否进行消息共享 * true:可以多个消费者消费 * false:只能一个消费者消费 * 4.是否自动删除,最后一个消费者断开连接后,该队列是否自动删除 * true:自动删除 * false:不自动删除 * 5.其他参数 */
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//发消息
String message="hello rabbitMQ";
/** * 发送一个消息 * 1.发送到哪个交换机 * 2.路由的KEY值是哪个? 指的是本次队列的名称 * 3.其他参数信息 * 4.发送的消息体 */
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息发送完毕");
channel.close();
connection.close();
}
}
/** * 消费者:接收消息 */
public class Consumer {
//队列名称,接收此队列的消息
public static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception{
//创建连接工程
ConnectionFactory factory = new ConnectionFactory();
//工厂IP连接rabbitMQ队列
factory.setHost("ip地址");
//设置用户名密码
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
//创建连接
Connection connection = factory.newConnection();
//通过连接来获取 信道来收消息
Channel channel = connection.createChannel();
//声明 接收消息的回调
DeliverCallback deliverCallback=(consumerTag, message)-> {
//message:包含消息头和消息体,我们只想拿到消息体
//若不进行转换,直接输出message我们拿到的则是地址
String data = new String(message.getBody());
System.out.println(new String(message.getBody()));
};
//声明 取消消费的回调
CancelCallback cancelCallback=consumerTag->{
System.out.println("消费消息被中断");
};
/** * 消费者消费消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动答应 * true:代表自动应答 * false:手动应答 * 3.消费成者成功消费的回调 * 4.消费者取消消费的回调 */
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
public class ProducerWorkQueue {
//队列名称
public static final String QUEUE_NAME="hello";
//发消息
public static void main(String[] args) throws Exception{
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
//工厂IP连接rabbitMQ队列
factory.setHost("ip地址");
//设置用户名密码
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
//创建连接
Connection connection = factory.newConnection();
//通过连接来获取 信道来发消息
Channel channel = connection.createChannel();
/** * 生成一个队列 * 1.队列名称 * 2.队列里面的信息是否持久化 默认false 信息存储在内存中 * 3.该列队是否只供一个消费者进行消费,是否进行消息共享 * true:可以多个消费者消费 * false:只能一个消费者消费 * 4.是否自动删除,最后一个消费者断开连接后,该队列是否自动删除 * true:自动删除 * false:不自动删除 * 5.其他参数 */
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
for (int i = 1; i <= 10; i++) {
//发消息
String message=i+"hello rabbitMQ";
/** * 发送一个消息 * 1.发送到哪个交换机 * 2.路由的KEY值是哪个? 指的是本次队列的名称 * 3.其他参数信息 * 4.发送的消息体 */
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息发送完毕");
}
channel.close();
connection.close();
}
}
/** * 消费者:接收消息 */
public class ConsumerWorkQueues1 {
//队列名称,接收此队列的消息
public static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception{
//创建连接工程
ConnectionFactory factory = new ConnectionFactory();
//工厂IP连接rabbitMQ队列
factory.setHost("ip地址");
//设置用户名密码
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
//创建连接
Connection connection = factory.newConnection();
//通过连接来获取 信道来收消息
Channel channel = connection.createChannel();
//声明 接收消息的回调
DeliverCallback deliverCallback=(consumerTag, message)-> {
//message:包含消息头和消息体,我们只想拿到消息体
//若不进行转换,直接输出message我们拿到的则是地址
String data = new String(message.getBody());
System.out.println(new String(message.getBody()));
};
//声明 取消消费的回调
CancelCallback cancelCallback=consumerTag->{
System.out.println("消费消息被中断");
};
/** * 消费者消费消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动答应 * true:代表自动应答 * false:手动应答 * 3.消费成者成功消费的回调 * 4.消费者取消消费的回调 */
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
/** * 消费者:接收消息 */
public class ConsumerWorkQueues2 {
//队列名称,接收此队列的消息
public static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception{
//创建连接工程
ConnectionFactory factory = new ConnectionFactory();
//工厂IP连接rabbitMQ队列
factory.setHost("ip地址");
//设置用户名密码
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
//创建连接
Connection connection = factory.newConnection();
//通过连接来获取 信道来收消息
Channel channel = connection.createChannel();
//声明 接收消息的回调
DeliverCallback deliverCallback=(consumerTag, message)-> {
//message:包含消息头和消息体,我们只想拿到消息体
//若不进行转换,直接输出message我们拿到的则是地址
String data = new String(message.getBody());
System.out.println(new String(message.getBody()));
};
//声明 取消消费的回调
CancelCallback cancelCallback=consumerTag->{
System.out.println("消费消息被中断");
};
/** * 消费者消费消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动答应 * true:代表自动应答 * false:手动应答 * 3.消费成者成功消费的回调 * 4.消费者取消消费的回调 */
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
//消费者生成的队列
channel.queueDeclare(QUEUE_NAME,(durable)true/false,false,false,null);
//MessageProperties.PERSISTENT_TEXT_PLAIN:将消息进行持久化
channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
channel.queueDeclare(QUEUE_NAME,false,false,(autoDelete的参数位置)false,null);
若开启了自动应答,rabbitMQ消息队列分配给消费者10个数据,只要消费者拿到消息队列的数据时,就会告诉消息队列,数据处理完毕。
若当我们处理到第5个数据时,消费者出现了宕机,死掉了,则会出现数据丢失
channel.basicConsume(QUEUE_NAME,(autoAck是否自动应答)false,deliverCallback,cancelCallback);
业务场景:
当我们的两个消费者执行业务时,a消费者执行速度快,b消费者执行速度慢,我们想让执行速度快的多执行,应当如何实现呢?
开启不公平分发,能者多劳 channel.basicQos(1); 0:轮询机制 1:能者多劳
开启手动确认
消费者a
/** * 消费者:接收消息 */
public class ConsumerWorkQueues1 {
//队列名称,接收此队列的消息
public static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//开启不公平分发,能者多劳
channel.basicQos(1);
DeliverCallback deliverCallback=(consumerTag, message)-> {
String data = new String(message.getBody());
System.out.println(new String(message.getBody()));
//参数1:确认队列中那个具体的消息:
// 可以获取消息的id
// 消息routingkey
// 交换机 exchange
// 消息和重传标志
//参数2:是否开启多个消息同时确认
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback=consumerTag->{
System.out.println("消费消息被中断");
};
/** * 消费者消费消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动答应 * true:代表自动应答 * false:手动应答 * 3.消费成者成功消费的回调 * 4.消费者取消消费的回调 */
//关闭自动应答 false
channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);
}
}
public class ConsumerWorkQueues2 {
//队列名称,接收此队列的消息
public static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//开启不公平分发,能者多劳
channel.basicQos(1);
//声明 接收消息的回调
DeliverCallback deliverCallback=(consumerTag, message)-> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(new String(message.getBody()));
//手动确认消息:
//参数1:确认队列中那个具体的消息 参数2:是否开启多个消息同时确认
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
//声明 取消消费的回调
CancelCallback cancelCallback=consumerTag->{
System.out.println("消费消息被中断");
};
/** * 消费者消费消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动答应 * true:代表自动应答 * false:手动应答 * 3.消费成者成功消费的回调 * 4.消费者取消消费的回调 */
//关闭自动应答 false
channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);
}
消费者b执行
public class ProducerWorkQueue {
//队列名称
public static final String QUEUE_NAME="hello";
//发消息
public static void main(String[] args) throws Exception{
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
Scanner scanner = new Scanner(System.in);
while (true){
String msg = scanner.nextLine();
channel.basicPublish("",QUEUE_NAME, null,msg.getBytes());
System.out.println("消息发送完毕");
}
}
}
public class ConsumerWorkQueues1 {
//队列名称,接收此队列的消息
public static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception{
//创建连接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
//创建连接
Connection connection = factory.newConnection();
//通过连接来获取 信道来收消息
Channel channel = connection.createChannel();
//声明 接收消息的回调
DeliverCallback deliverCallback=(consumerTag, message)-> {
//message:包含消息头和消息体,我们只想拿到消息体
//若不进行转换,直接输出message我们拿到的则是地址
String data = new String(message.getBody());
System.out.println("消费者1===>"+new String(message.getBody()));
try {
int i=3/0;//模拟业务发生异常
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
}catch (Exception e){
System.out.println("拒收消息发生了异常");
//拒收消息
//参数一:表示投递的消息标签
//参数二:是否开启多个消息同时确认
//参数三:是否重新给队列发送
channel.basicNack(message.getEnvelope().getDeliveryTag(),false,true);
}
};
//声明 取消消费的回调
CancelCallback cancelCallback=consumerTag->{
System.out.println("消费消息被中断");
};
/** * 消费者消费消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动答应 * true:代表自动应答 * false:手动应答 * 3.消费成者成功消费的回调 * 4.消费者取消消费的回调 */
//关闭自动应答 false
channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);
}
}
public class ConsumerWorkQueues2 {
//队列名称,接收此队列的消息
public static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception{
//创建连接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
//创建连接
Connection connection = factory.newConnection();
//通过连接来获取 信道来收消息
Channel channel = connection.createChannel();
//声明 接收消息的回调
DeliverCallback deliverCallback=(consumerTag, message)-> {
//message:包含消息头和消息体,我们只想拿到消息体
//若不进行转换,直接输出message我们拿到的则是地址
System.out.println("睡10秒");
try {
Thread.sleep(1000*10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(new String(message.getBody()));
//手动确认消息:
//参数1:确认队列中那个具体的消息 参数2:是否开启多个消息同时确认
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
//声明 取消消费的回调
CancelCallback cancelCallback=consumerTag->{
System.out.println("消费消息被中断");
};
/** * 消费者消费消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动答应 * true:代表自动应答 * false:手动应答 * 3.消费成者成功消费的回调 * 4.消费者取消消费的回调 */
//关闭自动应答 false
channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);
}
}
此时我们查看消费者a,出现了本应该是消费者b消费的消息bb
channel.basicQos(1); 0:轮询机制 1:能者多劳 若值>1代表当前队列的预取值,代表当前队列大概会拿到多少值
广播模式
,当我们的P消费者发送了消息,交给了X(交换机),所有绑定了这个X(交换机)的队列都可以接收到P消费者发送的消息public class Provider {
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//将通道声明指定交换机, 参数一:交换机名称 参数二:交换机类型 fanout广播类型
//参数2:交换机类型也可使用 BuiltinExchangeType. 的方式来查看选择
channel.exchangeDeclare("order", "fanout");
channel.basicPublish("order","",null,"fanout type message".getBytes());
channel.close();
connection.close();
}
}
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//通道绑定交换机
channel.exchangeDeclare("order","fanout");
//获取临时队列名称
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queueName,"order","");
channel.basicConsume(queueName,true,(consumerTag,message)->{
System.out.println("消费者1===>"+new String(message.getBody()));
},consumerTag -> System.out.println("取消消费消息"));
}
}
routing值订阅模型-Direct(直连)
在上面广播模式中,一条消息,会被所有订阅的队列都消费。但是在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange
在Direct模型下:
队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
消息的发送方在Exchange发送消息时,也必须指定消息的RoutingKey
Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的RoutingKey与消息的Routing Key完全一致,才会接受到消息
生产者
public class Provider {
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//通过信道声明交换机, 参数一:交换机名称 参数二:direct 路由模式
channel.exchangeDeclare("logsExchange","direct");
//发送消息 参数一:发送信息到的交换机名称
// 参数二:绑定路由 发送给队列的那个路由key,
//只有当队列的路由key与交换机的路由key相对应时,队列才会接受到消息
channel.basicPublish("logsExchange","msgRouting",null,"routing logs direct info 发送了消息".getBytes());
channel.close();
connection.close();
}
}
public class Consumer1 {
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs","direct");
//获取临时队列名
String queueName = channel.queueDeclare().getQueue();
//绑定队列:参数一:临时队列名称 参数二:绑定的交换机名称 参数三:路由key,若消费者的路由key与生产者的路由key相同则可以收到消息
channel.queueBind(queueName,"logsExchange","infoRouting");
channel.queueBind(queueName,"logsExchange","msgRouting");
channel.basicConsume(queueName,true,(consumerTag, message) -> System.out.println(new String(message.getBody())),consumerTag -> System.out.println(1));
}
}
public class Consumer2 {
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs","direct");
//获取临时队列名
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,"logs","error");
channel.queueBind(queueName,"logs","msg");
channel.basicConsume(queueName,true,(consumerTag, message) -> System.out.println(new String(message.getBody())),consumerTag -> System.out.println(1));
}
}
#通配符
* (star) can substitute for exactly one word :匹配一个词
# (hash) can substitute for zero or more words :匹配一个或多个词
public class Provider {
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//通过信道声明交换机, 参数一:交换机名称 参数二:topic 动态路由
channel.exchangeDeclare("order","topic");
String routingKey="user.order";
//发送消息 参数一:发送信息到的交换机名称 参数二:绑定路由 发送给队列的那个路由key
channel.basicPublish("order",routingKey,null,("routing logs topic发送了消息"+routingKey).getBytes());
channel.close();
connection.close();
}
}
public class Consumer1 {
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("order","topic");
//获取临时队列名
String queueName = channel.queueDeclare().getQueue();
//绑定队列:参数一:临时队列名称 参数二:绑定的交换机名称 参数三:动态通配符路由key
channel.queueBind(queueName,"order","user.*");
channel.basicConsume(queueName,true,(consumerTag, message) -> System.out.println(new String(message.getBody())),consumerTag -> System.out.println(1));
}
}
public class Consumer2 {
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("order","topic");
//获取临时队列名
String queueName = channel.queueDeclare().getQueue();
//绑定队列:参数一:临时队列名称 参数二:绑定的交换机名称 参数三:动态通配符路由key
channel.queueBind(queueName,"order","user.#");
channel.basicConsume(queueName,true,(consumerTag, message) -> System.out.println(new String(message.getBody())),consumerTag -> System.out.println(1));
}
}
死信,顾名思义就是无法被消费的信息,字面意思可以这样理解,一般来说,producer将消息投递到queue里,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,自然就有了死信队列
为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
生产者
:给正产的消息队列发送消息,并且设置消息过期时间为10S,超过10S消息未被消费,则消息进入死信队列public class TTLProvider {
//普通交换机名称
public static final String NORMAL_EXCHANGE="normal_exchange";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip");
factory.setUsername("账户");
factory.setPassword("密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//发送死信 设置TTL过期时间
AMQP.BasicProperties properties=new AMQP.BasicProperties().builder().expiration("10000").build();
for (int i = 1; i <= 10; i++) {
String msg=""+i;
channel.basicPublish(NORMAL_EXCHANGE,"normal",properties,msg.getBytes());
}
System.out.println("结束发送");
}
}
正常队列消费者
public class TTLConsumer1 {
//普通交换机名称
public static final String NORMAL_EXCHANGE="normal_exchange";
//死信交换机名称
public static final String DEAD_EXCHANGE="dead_exchange";
//普通队列名称
public static final String NORMAL_QUEUE="normal_queue";
//死信队列名称
public static final String DEAD_QUEUE="dead_queue";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip");
factory.setUsername("账户");
factory.setPassword("密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明普通交换机和死信交换机
channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");
channel.exchangeDeclare(DEAD_EXCHANGE,"direct");
//声明普通队列
HashMap<String, Object> map = new HashMap<>();
//当消息被拒绝接受/未被消费 会将消息转发到死信队列
//正常队列设置死信交换机
map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//设置死信队列的routingKey
map.put("x-dead-letter-routing-key","dead");
channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
//声明死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
//绑定普通交换机与普通队列
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");
//绑定死信交换机与死信队列
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead");
DeliverCallback deliverCallback=( consumerTag, message)->{
System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));
};
CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);
channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);
}
}
死信队列消费者
public class TTLConsumer2 {
//死信队列名称
public static final String DEAD_QUEUE="dead_queue";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip");
factory.setUsername("账户");
factory.setPassword("密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
DeliverCallback deliverCallback=( consumerTag, message)->{
System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));
};
CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);
}
}
结果:
当设置了死信队列,和TTL过期时间,若超过了过期时间消息未被消费,则消息会转发到死信队列配置类
@Configuration
public class RabbitMQConfiguration {
//普通交换机
public static final String X_EXCHANGE="X";
//死信交换机
public static final String Y_DEAD_LETTER_EXCHANGE="Y";
//普通队列
public static final String QUEUE_A="QA";
public static final String QUEUE_B="QB";
//死信队列
public static final String DEAD_QUEUE_D="QD";
//声明普通x交换机
@Bean
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}
//声明死信交换机
@Bean
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
//声明普通队列A TTL:10S
@Bean
public Queue queueA(){
Map<String,Object> arg=new HashMap<>();
//设置死信交换机
arg.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
//设置死信routingKey
arg.put("x-dead-letter-routing-key","YD");
//设置TTL过期时间
arg.put("x-message-ttl",10000);
return QueueBuilder.durable(QUEUE_A).withArguments(arg).build();
}
//声明普通队列B TTL:40S
@Bean
public Queue queueB(){
Map<String,Object> arg=new HashMap<>();
//设置死信交换机
arg.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
//设置死信routingKey
arg.put("x-dead-letter-routing-key","YD");
//设置TTL过期时间
arg.put("x-message-ttl",40000);
return QueueBuilder.durable(QUEUE_B).withArguments(arg).build();
}
//死信队列
@Bean
public Queue queueD(){
return QueueBuilder.durable(DEAD_QUEUE_D).build();
}
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
@Bean
public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
TTL生产者
@RestController
@RequestMapping("/ttl")
@Slf4j
public class TTLProvider {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/{msg}")
public void sendMsg(@PathVariable("msg") String msg){
log.info("当前发送时间:{}发送了一条消息",new Date().toString());
rabbitTemplate.convertAndSend("X","XA","TTL消息延迟为10S,消息为===>"+msg);
rabbitTemplate.convertAndSend("X","XB","TTL消息延迟为40S,消息为===>"+msg);
}
}
死信队列消费者
@Component
@Slf4j
public class DeadLetterConsumer {
@RabbitListener(queues = "QD")
public void t1(Message message, Channel channel)throws Exception{
log.info("收到死信队列的消息{},时间为{}",new String(message.getBody(),"UTF-8"),new Date().toString());
}
}
死信队列-TTL过期时间测试结果
public class Producer {
//普通交换机名称
public static final String NORMAL_EXCHANGE="normal_exchange";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
for (int i = 1; i <= 10; i++) {
String msg=""+i;
channel.basicPublish(NORMAL_EXCHANGE,"normal",null,msg.getBytes());
}
}
}
public class Consumer01 {
//普通交换机名称
public static final String NORMAL_EXCHANGE="normal_exchange";
//死信交换机名称
public static final String DEAD_EXCHANGE="dead_exchange";
//普通队列名称
public static final String NORMAL_QUEUE="normal_queue";
//死信队列名称
public static final String DEAD_QUEUE="dead_queue";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明普通交换机和死信交换机
channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");
channel.exchangeDeclare(DEAD_EXCHANGE,"direct");
//声明普通队列
HashMap<String, Object> map = new HashMap<>();
//正常队列设置死信交换机
map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//设置死信队列的routingKey
map.put("x-dead-letter-routing-key","dead");
//设置当前正常队列的长度限制超过长度,后面的消息会进入到死信队列
map.put("x-max-length",6);
channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
//声明死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
//绑定普通交换机与普通队列
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");
//绑定死信交换机与死信队列
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead");
DeliverCallback deliverCallback=( consumerTag, message)->{
System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));
};
CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);
channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);
}
}
public class Consumer02 {
//死信队列名称
public static final String DEAD_QUEUE="dead_queue";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
DeliverCallback deliverCallback=( consumerTag, message)->{
System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));
};
CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);
}
}
public class Producer {
//普通交换机名称
public static final String NORMAL_EXCHANGE="normal_exchange";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
for (int i = 1; i <= 10; i++) {
String msg="info"+i;
channel.basicPublish(NORMAL_EXCHANGE,"normal",null,msg.getBytes());
}
}
}
public class Consumer01 {
//普通交换机名称
public static final String NORMAL_EXCHANGE="normal_exchange";
//死信交换机名称
public static final String DEAD_EXCHANGE="dead_exchange";
//普通队列名称
public static final String NORMAL_QUEUE="normal_queue";
//死信队列名称
public static final String DEAD_QUEUE="dead_queue";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip");
factory.setUsername("登录账户");
factory.setPassword("登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明普通交换机和死信交换机
channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");
channel.exchangeDeclare(DEAD_EXCHANGE,"direct");
//声明普通队列
HashMap<String, Object> map = new HashMap<>();
//正常队列设置死信交换机
map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//设置死信队列的routingKey
map.put("x-dead-letter-routing-key","dead");
channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
//声明死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
//绑定普通交换机与普通队列
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");
//绑定死信交换机与死信队列
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead");
DeliverCallback deliverCallback=( consumerTag, message)->{
String msg=new String(message.getBody());
if("info5".equals(msg)){
System.out.println("Consumer1接收消息===>"+msg+"此消息被拒绝");
//此消息被拒接,是否重新放回正常队列, false:不放回 则会放到死信队列
channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
}else {
System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
}
};
CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);
//开启手动应答
channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,cancelCallback);
}
}
public class Consumer02 {
//死信队列名称
public static final String DEAD_QUEUE="dead_queue";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
DeliverCallback deliverCallback=( consumerTag, message)->{
System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));
};
CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);
}
}
@Configuration
public class QueueConfig {
@Bean("exchange")
public DirectExchange exchange(){
return new DirectExchange("msg");
}
@Bean("simpleQue")
public Queue simpleQue(){
HashMap<String, Object> map = new HashMap<>();
//设置死信交换机
map.put("x-dead-letter-exchange","dead");
//设置死信路由
map.put("x-dead-letter-routing-key","deadKey");
//消息失效时间
map.put("x-message-ttl",10000);
return new Queue("simple",false,false,false,map);
}
@Bean
public Binding simpleQueueBandingExchange(@Qualifier("simpleQue") Queue simple,@Qualifier("exchange") DirectExchange msg)throws Exception{
return BindingBuilder.bind(simple).to(msg).with("info");
}
@Bean("deadExchange")
public DirectExchange exchange1(){
return new DirectExchange("dead");
}
@Bean("deadQueue")
public Queue deadQ(){
return new Queue("deadQue",false,false,false,null);
}
@Bean
public Binding deadKeyBindingDeadExchange(@Qualifier("deadQueue")Queue queue,@Qualifier("deadExchange")DirectExchange dead){
//绑定死信队列到死信交换机通过路由
return BindingBuilder.bind(queue).to(dead).with("deadKey");
}
}
@RestController
public class Provider {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/ttl/{message}")
public void t1(@PathVariable String message){
String queueName="simple";
Date date = new Date();
System.out.println(date);
rabbitTemplate.convertAndSend("msg","info",message);
}
}
@Component
public class Consumer {
@RabbitListener(queues = "deadQue")
public void hello(Message msg, Channel channel)throws Exception{
System.out.println("接收到消息"+new String(msg.getBody()));
Date date1 = new Date();
System.out.println(date1);
}
}
@Component
public class confirmConfig {
public static final String CONFIRM_EXCHANGE_NAME="confirm.exchange";
public static final String CONFIRM_QUEUE="confirm.queue";
public static final String CONFIRM_ROUTING_KEY="confirm";
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
@Bean("confirmQueue")
public Queue confirmQueue(){
return new Queue(CONFIRM_QUEUE);
}
@Bean
public Binding confirmQueueBindingConfirmExchange(@Qualifier("confirmExchange")DirectExchange confirmExchange,@Qualifier("confirmQueue")Queue confirmQueue){
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
}
}
@Component
@Slf4j
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
//注入
@PostConstruct //当所有注解执行完后,再执行这个注解
public void init(){
rabbitTemplate.setConfirmCallback(this);
}
/** * 交换机确认回调方法 * 发消息,交换机接收到了,回调 * 参数 * 1. correlationData:保存消息的ID及相关信息,这个消息是我们生产者手动传入的 * 2. 交换机收到消息 true * 3. null */
/** * 交换机确认回调方法 * 发消息,交换机接收失败,回调 * 参数 * 1. correlationData:保存消息的ID及相关信息 * 2. 交换机收到消息 false * 3. cause:失败的原因 */
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id=correlationData!=null?correlationData.getId():"";
if(b){
log.info("交换机已经收到了ID为{}的消息",id);
}else {
log.info("交换机为收到了ID为{}的消息,原因是:{}",id,s);
}
}
}
@RestController
public class ConfirmProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{msg}")
public void t1(@PathVariable String msg){
CorrelationData correlationData = new CorrelationData();
correlationData.setId("1");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.CONFIRM_ROUTING_KEY,"嘿嘿嘿".getBytes(),correlationData);
}
}
@Component
public class ConfirmConsumer {
@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE)
public void consumer(Message message){
System.out.println("高级特性确认发布消费者收到了消息===>"+new String(message.getBody()));
}
}
@Component
@Slf4j
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
//注入
@PostConstruct //当所有注解执行完后,再执行这个注解
public void init(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
/** * 交换机确认回调方法 * 发消息,交换机接收到了,回调 * 参数 * 1. correlationData:保存消息的ID及相关信息 * 2. 交换机收到消息 true * 3. null */
/** * 交换机确认回调方法 * 发消息,交换机接收失败,回调 * 参数 * 1. correlationData:保存消息的ID及相关信息 * 2. 交换机收到消息 false * 3. cause:失败的原因 */
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id=correlationData!=null?correlationData.getId():"";
if(b){
log.info("交换机已经收到了ID为{}的消息",id);
}else {
log.info("交换机未收到了ID为{}的消息,原因是:{}",id,s);
}
}
/** * 消息传递过程中 不可达 消费者的队列时将消息返回给生产者 * 只有当消息 不可达 目的地的时候 才进行回调 * 参数1:消息体 * 参数2:回复代码 * 参数3:回复原因 * 参数4:交换机 * 参数5:路由key */
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
log.info("消息{},被交换机{}退回,原因是{},路由是{}",new String(message.getBody()),s1,s,s2);
}
}
public class PriorityProducer {
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//设置优先级参数
AMQP.BasicProperties build = new AMQP.BasicProperties.Builder().priority(10).build();
for (int i = 1; i <= 10; i++) {
String msg="info"+i;
if(i==5){
channel.basicPublish("","hi",build,msg.getBytes());
}else {
channel.basicPublish("","hi",null,msg.getBytes());
}
}
}
}
public class PriorityConsumer {
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
HashMap<String, Object> map = new HashMap<>();
//设置当前队列为优先级队列
map.put("x-max-priority",10);
channel.queueDeclare("hi",false,false,false,map);
channel.basicConsume("hi",true,(consumerTag,message)->{
System.out.println("优先级队列接收消息顺序===>"+new String(message.getBody()));
},(consumerTag) -> System.out.println("取消回调"));
}
}
参数一:
prefetchSize:预先载入的大小 0表示不限制大小参数二:
prefetchCount:预先载入的消息条数参数三:
global:false注意:autoAck手动应答一定要为false
//设置每次确定一个消息
channel.basicQos(0,1,false);
生产者
public class AckProvider {
//队列名称
public static final String QUEUE_NAME="hello_Ack";
//发消息
public static void main(String[] args) throws Exception{
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip");
factory.setUsername("用户");
factory.setPassword("密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
Scanner scanner = new Scanner(System.in);
while (true){
String msg = scanner.nextLine();
channel.basicPublish("",QUEUE_NAME, null,msg.getBytes());
System.out.println("消息发送完毕");
}
}
}
消费者
public class AckConsumer2 {
//队列名称,接收此队列的消息
public static final String QUEUE_NAME="hello_Ack";
public static void main(String[] args) throws Exception{
//创建连接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip");
factory.setUsername("用户");
factory.setPassword("密码");
//创建连接
Connection connection = factory.newConnection();
//通过连接来获取 信道来收消息
Channel channel = connection.createChannel();
//声明 接收消息的回调
DeliverCallback deliverCallback=(consumerTag, message)-> {
//message:包含消息头和消息体,我们只想拿到消息体
//若不进行转换,直接输出message我们拿到的则是地址
System.out.println(new String(message.getBody()));
try {
Thread.sleep(1000*5);
} catch (InterruptedException e) {
e.printStackTrace();
}
//手动确认消息:
//参数1:确认队列中那个具体的消息 参数2:是否开启多个消息同时确认
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
//声明 取消消费的回调
CancelCallback cancelCallback=consumerTag->{
System.out.println("消费消息被中断");
};
//每次只消费一个
channel.basicQos(0,1,false);
/** * 消费者消费消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动答应 * true:代表自动应答 * false:手动应答 * 3.消费成者成功消费的回调 * 4.消费者取消消费的回调 */
//关闭自动应答 false
channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);
}
}
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/m0_50677223/article/details/119978068
内容来源于网络,如有侵权,请联系作者删除!