我的rabbitmq客户端遇到了赛车条件问题。我的服务有多个示例监听单个队列,将接收到的消息存储到数据库中。
当它们同时重新启动时,我有时会看到消息被重新发送并存储在数据库中两次。这通常在客户端通过检查correlationid是否已经存储在数据库中来处理。这在99.9%的时间里都有效(我每天处理500万条消息,每天发生一两次)。
正如我所说,我怀疑这是赛车造成的。我想我再次收到消息,而我的第一个消息仍在处理中。所以当我检查时,我没有看到它存储在数据库中,最后,存储两次。
我不应该说这是一个没有问题,但一直困扰着我,因为我不能真正解释发生了什么。
我怀疑这是在我重新启动服务时发生的。我想我从队列中断开了连接,而我仍然在处理消息,触发rabbitmq再次重新交付到另一个尚未关闭的示例。
我想做的是当我停止服务时,
- 告诉rabbitmq我不想再收到消息
- 等待所有当前正在处理的消息完成
- 发送ACK/NACK
- 关闭
现在我首先注销接收到的事件
_consumerServer.Received -= MessageReceived;
然后我正在处理通道和服务器
if (_channel != null)
{
_channel.Close();
_channel.Dispose();
}
if (_connectionServer != null)
{
_connectionServer.Close();
_connectionServer.Dispose();
}
1条答案
按热度按时间xsuvu9jc1#
您应该正确地处理重新传递,而不是尝试关闭使用者以使消息不会被重新传递。检查并处理在消息上设置
redelivered
标志的情况,并采取适当的措施。您还应该尝试以存储操作是幂等的方式存储消息-即。它可以发生多次,您的数据库中只有一条记录。请参阅团队在这里提供的指导方针:
https://www.rabbitmq.com/reliability.html#consumer