📒博客首页:崇尚学技术的科班人
🏇小肖来了
🍣今天给大家带来的文章是《万字 +图片解析死信队列和死信实战演练》🍣
🍣有的小伙伴可能会问死信队列有啥用?你看了这篇文章就知道了🍣
🍣希望各位小伙伴们能够耐心的读完这篇文章🍣
🙏博主也在学习阶段,如若发现问题,请告知,非常感谢🙏
💗同时也非常感谢各位小伙伴们的支持💗
无法被消费的消息
。由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。TTL
过期requeue = false
1. 消费者01
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
import java.util.HashMap;
import java.util.Map;
public class Consumer01 {
public static final String DEAD_EXCHANGE = "dead_exchange";
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static final String DEAD_QUEUE = "dead_queue";
public static final String NORMAL_QUEUE = "normal_queue";
/** * 死信实战 * 消费者01 * @param args * @throws Exception */
public static void main(String[] args) throws Exception{
Channel channel = RabbitmqUtil.getChannel();
// 死信交换机
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 普通交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
Map<String,Object> map = new HashMap<>();
map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
map.put("x-dead-letter-routing-key","lisi");
map.put("x-message-ttl",10000);
// 普通队列
channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
// 死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
// 队列绑定
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
System.out.println("等待接收消息......");
DeliverCallback deliverCallback = (var1, var2)->{
System.out.println("Consumer01控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
};
channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,var1->{});
}
}
消费者01
,它需要进行 死信交换机绑定死信队列、普通交换机绑定普通队列、普通队列绑定死信交换机。关闭消费者01
。2. 消费者02
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
import java.util.HashMap;
import java.util.Map;
public class Consumer02 {
/** * 消费者02 */
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitmqUtil.getChannel();
System.out.println("等待接收消息......");
DeliverCallback deliverCallback = (var1, var2)->{
System.out.println("Consumer02控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
};
// 只需要面向 死信队列消费消息,里面的关系绑定已经由C1完成
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,var1->{});
}
}
3. 生产者
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.xiao.utils.RabbitmqUtil;
import java.nio.charset.StandardCharsets;
public class Producer {
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitmqUtil.getChannel();
// 单位是毫秒
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
for(int i = 1; i < 11; i ++){
String message = "info" + i;
// 只需要向交换机里面发送消息,里面的关系绑定已经由C1完成
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
}
}
}
4. 测试结果
1. 消费者01
public class Consumer01 {
public static final String DEAD_EXCHANGE = "dead_exchange";
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static final String DEAD_QUEUE = "dead_queue";
public static final String NORMAL_QUEUE = "normal_queue";
/** * 死信实战 * 消费者01 * @param args * @throws Exception */
public static void main(String[] args) throws Exception{
Channel channel = RabbitmqUtil.getChannel();
// 死信交换机
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 普通交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
Map<String,Object> map = new HashMap<>();
map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
map.put("x-dead-letter-routing-key","lisi");
map.put("x-max-length",6);
//map.put("x-message-ttl",10000);
// 普通队列
channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
// 死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
// 队列绑定
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
System.out.println("等待接收消息......");
DeliverCallback deliverCallback = (var1, var2)->{
System.out.println("Consumer01控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
};
channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,var1->{});
}
}
队列最大长度
关闭消费者01
。2. 消费者02
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
import java.util.HashMap;
import java.util.Map;
public class Consumer02 {
/** * 消费者02 */
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitmqUtil.getChannel();
System.out.println("等待接收消息......");
DeliverCallback deliverCallback = (var1, var2)->{
System.out.println("Consumer02控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
};
// 只需要面向 死信队列消费消息,里面的关系绑定已经由C1完成
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,var1->{});
}
}
3. 生产者
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.xiao.utils.RabbitmqUtil;
import java.nio.charset.StandardCharsets;
public class Producer {
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitmqUtil.getChannel();
// 单位是毫秒
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
for(int i = 1; i < 11; i ++){
String message = "info" + i;
// 只需要向交换机里面发送消息,里面的关系绑定已经由C1完成
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
}
}
}
4. 测试结果
1. 消费者01
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
import java.util.HashMap;
import java.util.Map;
public class Consumer01 {
public static final String DEAD_EXCHANGE = "dead_exchange";
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static final String DEAD_QUEUE = "dead_queue";
public static final String NORMAL_QUEUE = "normal_queue";
/** * 死信实战 * 消费者01 * @param args * @throws Exception */
public static void main(String[] args) throws Exception{
Channel channel = RabbitmqUtil.getChannel();
// 死信交换机
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 普通交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
Map<String,Object> map = new HashMap<>();
map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
map.put("x-dead-letter-routing-key","lisi");
//map.put("x-max-length",6);
//map.put("x-message-ttl",10000);
// 普通队列
channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
// 死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
// 队列绑定
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
System.out.println("等待接收消息......");
DeliverCallback deliverCallback = (var1, var2)->{
String msg = new String(var2.getBody(),"UTF-8");
if(msg.equals("info5")){
System.out.println("Consumer01控制台接收到的消息是:" + msg + ": 此消息被拒" );
channel.basicReject(var2.getEnvelope().getDeliveryTag(),false);
}else{
System.out.println("Consumer01控制台接收到的消息是:" + msg);
channel.basicAck(var2.getEnvelope().getDeliveryTag(),false);
}
};
channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,var1->{});
}
}
队列最大长度
注释掉还需要开启手动应答
,因为不开启就不会存在消息被拒 的问题。2. 消费者02
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
import java.util.HashMap;
import java.util.Map;
public class Consumer02 {
/** * 消费者02 */
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitmqUtil.getChannel();
System.out.println("等待接收消息......");
DeliverCallback deliverCallback = (var1, var2)->{
System.out.println("Consumer02控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
};
// 只需要面向 死信队列消费消息,里面的关系绑定已经由C1完成
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,var1->{});
}
}
3. 生产者
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.xiao.utils.RabbitmqUtil;
import java.nio.charset.StandardCharsets;
public class Producer {
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitmqUtil.getChannel();
// 单位是毫秒
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
for(int i = 1; i < 11; i ++){
String message = "info" + i;
// 只需要向交换机里面发送消息,里面的关系绑定已经由C1完成
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
}
}
}
4. 测试结果
"info5"
被作为死信。版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_56727438/article/details/122081742
内容来源于网络,如有侵权,请联系作者删除!