rabbitmq 出现异常时将消息重新排队

v1l68za4  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(2)|浏览(259)

我正在寻找一种可靠的方法来重新排队的消息,不能正确处理-在这个时候。
我一直在研究http://dotnetcodr.com/2014/06/16/rabbitmq-in-net-c-basic-error-handling-in-receiver/,它似乎支持在RabbitMQ API中对消息进行重新排队。

else //reject the message but push back to queue for later re-try
{
    Console.WriteLine("Rejecting message and putting it back to the queue: {0}", message);
    model.BasicReject(deliveryArguments.DeliveryTag, true);
}

但是我用的是EasyNetQ,所以我想知道我会怎么做类似的事情。

bus.Subscribe<MyMessage>("my_subscription_id", msg => {
    try
    {
        // do work... could be long running
    }
    catch ()
    {
        // something went wrong - requeue message
    }
});

如果do work超过RabbitMQ服务器等待ACK的超时时间,消息可能会导致问题。

pbwdgjma

pbwdgjma1#

所以我想出了这个解决方案。它取代了EasyNetQ默认的错误策略。

public class DeadLetterStrategy : DefaultConsumerErrorStrategy
{
    public DeadLetterStrategy(IConnectionFactory connectionFactory, ISerializer serializer, IEasyNetQLogger logger, IConventions conventions, ITypeNameSerializer typeNameSerializer)
    : base(connectionFactory, serializer, logger, conventions, typeNameSerializer)
    {
    }

    public override AckStrategy HandleConsumerError(ConsumerExecutionContext context, Exception exception)
    {
        object deathHeaderObject;
        if (!context.Properties.Headers.TryGetValue("x-death", out deathHeaderObject))
            return AckStrategies.NackWithoutRequeue;

        var deathHeaders = deathHeaderObject as IList;

        if (deathHeaders == null)
            return AckStrategies.NackWithoutRequeue;

        var retries = 0;
        foreach (IDictionary header in deathHeaders)
        {
            var count = int.Parse(header["count"].ToString());
            retries += count;
        }

        if (retries < 3)
            return AckStrategies.NackWithoutRequeue;
        return base.HandleConsumerError(context, exception);
    }
}

您可以将其替换为:

RabbitHutch.CreateBus("host=localhost", serviceRegister => serviceRegister.Register<IConsumerErrorStrategy, DeadLetterStrategy>())

你必须使用AdvancedBus,所以你必须手动设置一切。

using (var bus = RabbitHutch.CreateBus("host=localhost", serviceRegister => serviceRegister.Register<IConsumerErrorStrategy, DeadLetterStrategy>()))
{
    var deadExchange = bus.Advanced.ExchangeDeclare("exchange.text.dead", ExchangeType.Direct);
    var textExchange = bus.Advanced.ExchangeDeclare("exchange.text", ExchangeType.Direct);
    var queue = bus.Advanced.QueueDeclare("queue.text", deadLetterExchange: deadExchange.Name);
    bus.Advanced.Bind(deadExchange, queue, "");
    bus.Advanced.Bind(textExchange, queue, "");

    bus.Advanced.Consume<TextMessage>(queue, (message, info) => HandleTextMessage(message, info));
}

这将死信一个失败的消息3次。之后,它将进入由EasyNetQ提供的默认错误队列进行错误处理。* 您可以订阅该队列。*
当一个异常传播出你的消费者方法时,一个消息就是死信。

static void HandleTextMessage(IMessage<TextMessage> textMessage, MessageReceivedInfo info)
{
    throw new Exception("This is a test!");
}
eni9jsuy

eni9jsuy2#

据我所知,没有办法用EasyNetQ手动发送acknackreject消息。
我看到你有关于这个的opened an issue ticket with the EasyNetQ team ...但是还没有答案。
FWIW,这是一件非常合适的事情。我使用的所有库都支持这个特性集(在NodeJS中),这是常见的。我很惊讶EasyNetQ不支持这个。

相关问题