消费者在消费 MQ
中的消息时,MQ
已把消息发送给消费者,消费者在给MQ
返回 ack
时网络中断,故 MQ
未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。
MQ
消费者的幂等性的解决一般使用全局 ID
或者写个唯一标识比如时间戳 或者 UUID
或者订单消费者消费 MQ
中的消息也可利用 MQ
的该 id
来判断,或者可按自己的规则生成一个全局唯一 id
,每次消费消息时用该 id
先判断该消息是否已消费过。
在海量订单生成的业务高峰期,生产端有可能就会重复发生了消息,这时候消费端就要实现幂等性,这就意味着我们的消息永远不会被消费多次,即使我们收到了一样的消息。业界主流的幂等性有两种操作:
id
是否存在数据库中,优势就是实现简单就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。redis
执行 setnx
命令,天然具有幂等性。从而实现不重复消费。在我们系统中有一个订单催付的场景,我们的客户在天猫下的订单,淘宝会及时将订单推送给我们,如果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧,但是,tmall
商家对我们来说,肯定是要分大客户和小客户的对吧,比如像苹果,小米这样大商家一年起码能给我们创造很大的利润,所以理应当然,他们的订单必须得到优先处理,而曾经我们的后端系统是使用 redis
来存放的定时轮询,大家都知道 redis
只能用 List
做一个简简单单的消息队列,并不能实现一个优先级的场景,所以订单量大了后采用 RabbitMQ
进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级,否则就是默认优先级。
/
左边的是编号,右边的是优先级。1. 页面中如何添加
2. 代码形式如何添加
1. 生产者代码
package com.xiao.helloworld;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class Producer {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置IP地址
factory.setHost("192.168.123.129");
// 用户名
factory.setUsername("admin");
// 密码
factory.setPassword("123");
// 创建连接
Connection connection = factory.newConnection();
// 获取信道
Channel channel = connection.createChannel();
/** * 声明队列 * 1.队列名称 * 2.队列里面的消息是否持久化(磁盘)默认情况下消息存储在内存中 * 3.该队列是否提供一个消费者进行消费,就是是否进行消息共享 * 4.就是当最后一个消费者断开连接之后,该队列是否自动删除消息 * 5.其他参数 */
Map<String,Object> map = new HashMap<>();
map.put("x-max-priority",10);
channel.queueDeclare(QUEUE_NAME,true,false,false,map);
// 发消息
for (int i = 0; i < 10; i++) {
String message = "info" + i;
if(message.equals("info5")){
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().priority(5).build();
channel.basicPublish("",QUEUE_NAME,basicProperties,message.getBytes(StandardCharsets.UTF_8));
}else{
channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
}
}
/** * 发送一个消息 * 1.发送到哪个交换机 * 2.路由的Key值,也就是本次队列的名称 * 3.其他参数信息 * 4.发送消息的消息内容 */
System.out.println("消息发送完毕");
}
}
2. 消费者代码
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.123.129");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
DeliverCallback deliverCallback = (var1, var2)->{
System.out.println(new String(var2.getBody()));
};
CancelCallback cancelCallback = var1->{
System.out.println("消费消息被中断");
};
/** * 接收消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 * 3.消费者未成功消费的回调 * 4.消费者取消消费的回调 */
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
3. 测试结果
info5
的消息最先被消费。default
和lazy
lazy模式的网页形式配置
lazy模式的代码形式配置
Map<String,Object> map = new HashMap<>();
map.put("x-queue-mode","lazy");
channel.queueDeclare(QUEUE_NAME,true,false,false,map);
1 百万
条消息,每条消息大概占 1KB
的情况下,普通队列占用内存是 1.2GB
,而惰性队列仅仅占用 1.5MB
。版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_56727438/article/details/122142486
内容来源于网络,如有侵权,请联系作者删除!