我正在使用RabbitMQ,我有一个队列保存电子邮件消息。我的消费者服务将消息从队列中取出并尝试发送它们。如果由于某种原因,我的消费者无法发送消息,我希望将消息重新排队以便再次发送。我知道我可以执行basicNack并将queue标志设置为true,但是,我不希望无限期地将消息重新排队(比方说,如果我们的电子邮件系统出现故障,我不想继续对未发送的消息进行重新排队)。我希望定义一个有限的次数,以便对要再次发送的消息进行重新排队。但是,我不能在电子邮件消息对象上设置字段,当我将它出列并发送一个nack时。更新的字段不存在于队列中的消息上。有没有其他方法可以处理这个问题?提前感谢。
4条答案
按热度按时间f1tvaqid1#
RabbitMQ(以及AMQP协议)中没有类似重试尝试的功能。
实现重试次数限制行为的可能解决方案:
1.如果消息以前没有被重新传递,则重新传递它(检查
basic.deliver
方法上的redelivered
参数-您的库应该有一些用于此的接口),并丢弃它,然后在dead letter exchange中捕获,然后以某种方式处理。1.每次无法处理消息时,都要重新发布它,但要设置或增加/减少报头字段,比如
x-redelivered-count
(不过,你可以选择任何你喜欢的名称)。在这种情况下,要控制重新传递,你必须检查你设置的字段是否达到了某个限制(顶部或底部- 0是我的选择,a-lattl
在tcp/ip的ip报头中)。1.在Redis,memcache或其他存储器中,甚至在mysql中存储消息的唯一密钥(比如uuid,但你必须在发布消息时手动设置),并在每次重新发送时增加/减少该值,直到达到限制。
1.(对于真正极客来说)编写插件来实现你想要的行为。
#3的优点是重新传递的消息会停留在队列头。如果您有很长的队列或者消息顺序对您很重要,这一点很重要(注意,重新传递会破坏严格的消息顺序,详情请参见官方文档或this question on SO)。
附言:
本主题中有similar answer,但在php中,通读一下,也许会对你有一点帮助(从文字开始阅读 “有多种技术来处理循环重交付问题”。
ctrmrzij2#
虽然这是一个老问题,但我认为现在可以通过死信交换和在消息为死信时添加的x-death头数组的组合来轻松地实现这一点:
死信处理作业会将名为x-death的数组新增至每个死信消息的信头。此数组包含每个死信事件的项目,由一对{queue,reason}识别。每个此类项目都是一个表格,包含数个字段:
queue:消息被发送死信之前所在队列的名称
原因:废字原因,见下文
时间:消息被死记硬背为64位AMQP 0 - 9 - 1时间戳的日期和时间
exchange-消息发布到的交换(请注意,如果消息被多次发送死信,则这将是死信交换)
路由密钥:发布消息时使用的路由关键字(包括抄送关键字,但不包括密件抄送关键字)
count:由于此原因,此消息在此队列中死信的次数
original-expiration(如果消息由于每消息TTL而被死信):返回消息的原始过期属性。过期属性将在死信时从消息中删除,以防止消息在路由到的任何队列中再次过期。
阅读此great article了解更多信息
请检查此图表:
t0ybt7op3#
您应该使用仲裁队列类型。该队列类型有一个传递限制参数,用于指定在删除邮件之前重试传递邮件的次数。
创建队列时指定下一个参数:
1tu0hz3e4#
从我的Angular 来看,这里更好的想法是在使用者内部实现死信交换和重试逻辑的组合。
在下面,你可以找到一个用node-amqp和Rabbitmq https://github.com/kharandziuk/dead-letter-exchange-prototype实现的死信交换的原型。