如何使用EasyNetQ / RabbitMQ进行错误处理

yzckvree  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(4)|浏览(269)

我使用C#中的RabbitMQ和EasyNetQ库。我在这里使用了pub/sub模式。我仍然有一些问题,我希望任何人都能帮助我:
1.如果在使用消息时出现错误,它会自动移动到错误队列中。如何实现重试(以便将它放回原始队列中,并且当它处理X次失败时,将它移动到死信队列中)?
1.就我所见,总是有一个错误队列用于转储来自所有其他队列的消息。我如何才能有一个类型的错误队列,使每个队列都有自己的相关错误队列?
1.我如何轻松地重试错误队列中的消息?我尝试过软管,但它只是将消息重新发布到错误队列,而不是原始队列。我也不太喜欢这个选项,因为我不想在控制台中瞎折腾。最好我只针对错误队列编程。
有没有人?

7fhtutme

7fhtutme1#

您在使用EasyNetQ/RabbitMQ时遇到的问题是,与SQS或Azure Service Bus/Queues等其他消息传递服务相比,它要“原始”得多,但我会尽我所能为您指出正确的方向。

问题1.

最简单的方法是你可以在RabbitMQ/EasyNetQ中对一条消息进行No-Ack,然后它会被放在队列的最前面等待你重试。这并不是真正可取的,因为它几乎会立即被重试(没有时间延迟),并且还会阻止其他消息被处理(如果你有一个预取计数为1的订阅者)。
我见过使用“MessageEnvelope”的其他实现。因此,当消息失败时,您可以使用一个 Package 器类来递增MessageEnvelope上的retry变量,并将消息重新传递回队列。您必须这样做,并编写消息处理程序周围的 Package 代码,这不是EasyNetQ的功能。
使用上面的方法,我也看到人们使用信封,但是允许消息是死信的。一旦它在死信队列中,就有另一个应用程序/工作者从死信队列中读取项目。
上述所有方法都有一个小问题,即在处理消息时,没有任何好的方法来处理对数/指数/任何类型的递增延迟。在将消息返回到队列之前,可以在代码中“保留”消息一段时间,但这不是一个好的方法。
在所有这些选项中,您自己的自定义应用程序读取死信队列,并根据包含重试计数的信封决定是否重新路由消息可能是最好的方法。

问题2.

您可以使用高级API为每个队列指定一个死信交换。(https://github.com/EasyNetQ/EasyNetQ/wiki/The-Advanced-API#declaring-queues)。然而,这意味着您将不得不在几乎所有地方使用高级API,因为使用subscribe/publish的简单IBus实现会查找基于消息类型和订阅者名称命名的队列。使用自定义队列声明意味着您将自己处理队列的命名,这意味着当您订阅时,您需要知道您想要的名称等等。不再需要自动订阅了!

问题3

一个错误队列/死信队列只是另一个队列。你可以监听这个队列,然后对它做你需要做的事情。但是,没有任何现成的解决方案听起来像是适合你的需要。

inb24sb2

inb24sb22#

我已经完全按照你的描述来实现了。下面是一些基于我的经验并与你的每个问题相关的提示。

Q1(如何重试X次):

为此,您可以使用IMessage.Body.BasicProperties.Headers。当您使用错误队列中的消息时,只需添加一个具有您选择的名称的标头。在进入错误队列的每个消息上查找此标头并递增它。这将为您提供一个正在运行的重试计数。
当消息超过X的重试限制时,有一个策略来做什么是 * 非常重要的 *。您不希望丢失该消息。在我的例子中,我在此时将消息写入磁盘。它为您提供了许多有用的调试信息,以便稍后返回,因为EasyNetQ会自动将原始消息与错误信息 Package 在一起。它还包含原始消息,以便您可以:如果您愿意,可以稍后以某种受控的方式手动(或者通过某些批处理重新处理代码自动)对消息进行重新排队。
您可以查看Hosepipe实用程序中的代码,以了解执行此操作的好方法。实际上,如果您遵循此处所示的模式,则如果需要,您甚至可以稍后使用Hosepipe对消息进行重新排队。

Q2(如何为每个原始队列创建错误队列):

你可以使用EasyNetQ高级总线来完成这一任务。使用IBus.Advanced.Container.Resolve<IConventions>来获取约定接口。然后你可以使用conventions.ErrorExchangeNamingConventionconventions.ErrorQueueNamingConvention来设置错误队列命名的约定。在我的例子中,我将约定设置为基于原始队列的名称,以便在每次创建队列时都获得队列的queue/queue_error对。

Q3(如何处理错误队列中的消息):

您可以像声明任何其他队列一样声明错误队列的消费者。同样,AdvancedBus通过指定队列的类型为EasyNetQ.SystemMessage.Error,让您可以干净地完成此操作。因此,IAdvancedBus.Consume<EasyNetQ.SystemMessage.Error>()将使您达到此目的。重试只是意味着重新发布到原始交换(注意标头中的重试次数(请参阅我对上面问题1的回答),以及错误消息中从错误队列中消耗的信息,可以帮助您找到重新发布的目标。

h22fl7wq

h22fl7wq3#

我知道这是一个老职位,但-只是以防万一它帮助别人-这里是my self-answered question(我需要问它,因为现有的帮助是不够的),它解释了我如何实现在它们的原始队列上重试失败的消息。下面应该回答您的问题#1和#3。对于#2,您可能必须使用高级API,我还没有使用过它(我认为它违背了EasyNetQ的目的;也可以直接使用RabbitMQ客户端)。
1)由于一个消息可能有多个使用者,并且所有使用者都不需要重试一个msg,因此我在消息正文中使用了Dictionary<consumerId, RetryInfo>,因为EasyNetQ不支持消息头中的复杂类型(开箱即用)。

public interface IMessageType
{
    int MsgTypeId { get; }

    Dictionary<string, TryInfo> MsgTryInfo {get; set;}

}

2)我已经实现了一个class RetryEnabledErrorMessageSerializer : IErrorMessageSerializer,它在每次被框架调用时只更新TryCount和其他信息。我通过EasyNetQ提供的IoC支持,在每个消费者的基础上将这个自定义序列化程序附加到框架。

public class RetryEnabledErrorMessageSerializer<T> : IErrorMessageSerializer where T : class, IMessageType
 {
        public string Serialize(byte[] messageBody)
        {
             string stringifiedMsgBody = Encoding.UTF8.GetString(messageBody);
             var objectifiedMsgBody = JObject.Parse(stringifiedMsgBody);

             // Add/update RetryInformation into objectifiedMsgBody here
             // I have a dictionary that saves <key:consumerId, val: TryInfoObj>

             return JsonConvert.SerializeObject(objectifiedMsgBody);
        }
  }

在我的EasyNetQ Package 器类中:

public void SetupMessageBroker(string givenSubscriptionId, bool enableRetry = false)
    {
        if (enableRetry)
        {
            _defaultBus = RabbitHutch.CreateBus(currentConnString,
                                                        serviceRegister => serviceRegister.Register<IErrorMessageSerializer>(serviceProvider => new RetryEnabledErrorMessageSerializer<IMessageType>(givenSubscriptionId))
                                                );
        }
        else // EasyNetQ's DefaultErrorMessageSerializer will wrap error messages
        {
            _defaultBus = RabbitHutch.CreateBus(currentConnString);
        }
    }

    public bool SubscribeAsync<T>(Func<T, Task> eventHandler, string subscriptionId)
    {
        IMsgHandler<T> currMsgHandler = new MsgHandler<T>(eventHandler, subscriptionId);
        // Using the msgHandler allows to add a mediator between EasyNetQ and the actual callback function
        // The mediator can transmit the retried msg or choose to ignore it
        return _defaultBus.SubscribeAsync<T>(subscriptionId, currMsgHandler.InvokeMsgCallbackFunc).Queue != null;
    }

3)一旦消息被添加到默认的错误队列中,你就可以拥有一个简单的控制台应用程序/窗口服务,它会定期将现有的错误消息重新发布到它们的原始队列中。

var client = new ManagementClient(AppConfig.BaseAddress, AppConfig.RabbitUsername, AppConfig.RabbitPassword);
var vhost = client.GetVhostAsync("/").Result;
var aliveRes = client.IsAliveAsync(vhost).Result;
var errQueue = client.GetQueueAsync(Constants.EasyNetQErrorQueueName, vhost).Result;
var crit = new GetMessagesCriteria(long.MaxValue, Ackmodes.ack_requeue_false);
var errMsgs = client.GetMessagesFromQueueAsync(errQueue, crit).Result;
foreach (var errMsg in errMsgs)
{
    var innerMsg = JsonConvert.DeserializeObject<Error>(errMsg.Payload);
    var pubInfo = new PublishInfo(innerMsg.RoutingKey, innerMsg.Message);
    pubInfo.Properties.Add("type", innerMsg.BasicProperties.Type);
    pubInfo.Properties.Add("correlation_id", innerMsg.BasicProperties.CorrelationId);
    pubInfo.Properties.Add("delivery_mode", innerMsg.BasicProperties.DeliveryMode);
    var pubRes = client.PublishAsync(client.GetExchangeAsync(innerMsg.Exchange, vhost).Result, pubInfo).Result;
}

4)我有一个MessageHandler类,其中包含一个回调函数。每当消息被传递到使用者时,它都会转到MessageHandler,MessageHandler决定消息try是否有效,如果有效,则调用实际的回调。如果try无效(maxRetriesExceeded/使用者不需要重试),我将忽略该消息。在这种情况下,您可以选择死信该消息。

public interface IMsgHandler<T> where T: class, IMessageType
{
    Task InvokeMsgCallbackFunc(T msg);
    Func<T, Task> MsgCallbackFunc { get; set; }
    bool IsTryValid(T msg, string refSubscriptionId); // Calls callback only 
                                                      // if Retry is valid
}

下面是MsgHandler中调用回调的中介函数:

public async Task InvokeMsgCallbackFunc(T msg)
    {
        if (IsTryValid(msg, CurrSubscriptionId))
        {
            await this.MsgCallbackFunc(msg);
        }
        else
        {
            // Do whatever you want
        }
    }
nvbavucw

nvbavucw4#

在这里,我实现了一个Nuget包(EasyDeadLetter),它可以在任何项目中以最小的改动轻松实现。
1.首先,用QueuAttribute修饰类对象

[Queue(“Product.Report”, ExchangeName = “Product.Report”)]
 public class ProductReport { }

1.第二步是使用相同的QueueAttribute定义死信队列,并从Main对象类继承死信对象。

[Queue(“Product.Report.DeadLetter”, ExchangeName = 
“Product.Report.DeadLetter”)]
public class ProductReportDeadLetter : ProductReport { }

1.现在,是时候用EasyDeadLetter属性来修饰您的主队列对象,并设置死信队列的类型。

[EasyDeadLetter(DeadLetterType =    
     typeof(ProductReportDeadLetter))]
[Queue(“Product.Report”, ExchangeName = “Product.Report”)]
public class ProductReport { }

1.在最后一步中,您需要将EasyDeadLetterStrategy注册为默认错误处理程序(ICummerErrorStrategy)。

services.AddSingleton<IBus> 
 (RabbitHutch.CreateBus(“connectionString”,
    serviceRegister =>
    {
       serviceRegister.Register<IConsumerErrorStrategy, 
       EasyDeadLetterStrategy>();
    }));

从现在开始,任何失败的消息都将被移到相关的死信队列中。
请在此处查看更多详情:GitHub RepositoryNuGet Package显示器

相关问题