rabbitmq 为什么MassTransit向跳过的队列发送消息并给出超时异常

knpiaxh1  于 2023-03-23  发布在  RabbitMQ
关注(0)|答案(1)|浏览(188)

我在. Net核心Web API中使用Mass Transit(8.0.6),其中一条备用消息总是会跳过队列并因超时异常而失败。1个请求通过,下一个失败。
异常:在System. Runtime. ExceptionServices. ExceptionDispatchInfo. Throw()at System. Runtime. CompilerServices. TaskAwaiter. ThrowForNonSuccess(Task任务)at System. Runtime. CompilerServices. TaskAwaiter. HandleNonSuccessAndDebuggerNotification(Task任务)at System. Runtime. CompilerServices. ConfiguredTaskWaitable 1.ConfiguredTaskAwaiter.GetResult() at MassTransit.Clients.ResponseHandlerConnectHandle 1.d__11.MoveNext()in/_/src/MassTransit/Handle/ResponseHandlerCompiletClients. cs:第55行at System. Runtime. ExceptionServices. ExceptionDispatchInfo. Runtime()at System. Runtime. CompilerServices. TaskAwaiter. ThrowForNonSuccess(Task任务)at System. Runtime. CompilerServices. TaskAwaiter. HandleNonSuccessAndDebuggerNotification(Task任务)at System. Runtime. CompilerServices. ConfiguredTaskAwaitable 1.ConfiguredTaskAwaiter.GetResult() at MassTransit.Clients.RequestClient

{
try
{
var userDetails = new List\<UserDetail\>();
var columns = new List\<ColumnDetails\>() { new ColumnDetails() { ColumnName = "UserId", DataType = typeof(string) } };

                var dataTable = DataTableExtension.CreateDataTable<User>(context.Message.UserIds, columns);
               
                string[] paramName = { "@UserIds", "@FilePath" };
                var data = this._unitOfWork.GetDataFromStoredProcedure("[rabbitmq].[GetUserDetails]", paramName, dataTable, context.Message.FilePath);
                if (data != null && data.Rows.Count > 0)
                {
                    for (int i = 0; i < data.Rows.Count; i++)
                    {
                        var user = new UserDetail()
                        {
                            UserImage = Convert.ToString(data.Rows[i]["UserImage"]),
                            FullName = Convert.ToString(data.Rows[i]["FullName"]),
                            UserId = Convert.ToString(data.Rows[i]["UserId"])
                        };
                        userDetails.Add(user);
                    }
    
                }
                if (userDetails.Count == 0)
                    await context.RespondAsync<string>("No data found.");
                else
                    await context.RespondAsync<GetUserDetailsResult>(new
                    {
                        UserDetails = userDetails
                    });
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

客户代码:

{
private readonly IRequestClient\<GetUserDetails\> \_client1;
private readonly IRequestClient\<GetRecordTokenDetails\> \_client2;
public GetUserAndTokenDetailsRMQHandler(IRequestClient\<GetUserDetails\> client1, IRequestClient\<GetRecordTokenDetails\> client2)
{
this.client1 = client1;
this.client2 = client2;
}
public async Task<GetUserAndTokenDetailsResponse> Handle(GetUserAndTokenDetailsRequest request, CancellationToken cancellationToken)
{
var response = new GetUserAndTokenDetailsResponse() { UserDetails = new List<UserDetail>(), RecordTokenDetailsResult = new List<TokensDetail>() };
try
{
if (request.UserIds?.Count > 0)
{
//2.auth consumer
var result1 = await _client1.GetResponse<GetUserDetailsResult>(new
{
Filepath = request.FilePath,
UserIds = request.UserIds
});

                    foreach (var user in result1.Message.UserDetails)
                    {
                        response.UserDetails.Add(new UserDetail() { FullName = user.FullName, UserImage = user.UserImage, UserId = user.UserId });
                    }
                }
    
                //3.payment consumer
                if (request.RecordTokenDetails != null)
                {
                    var result2 = await _client2.GetResponse<GetRecordTokenDetailsResult>(new
                    {
                        RecordTokenDetails = request.RecordTokenDetails
                    });
                    foreach (var record in result2.Message.RecordTokenDetailsResult)
                    {
                        response.RecordTokenDetailsResult.Add(new TokensDetail()
                        {
                            SharesAvailable = record.SharesAvailable,
                            TotalPendingInvestment = record.TotalPendingInvestment,
                            MyStatusOnRecord = record.MyStatusOnRecord,
                            MyStatusTypeOnRecord = record.MyStatusTypeOnRecord,
                            ArtistId = record.ArtistId,
                            RecordId = record.RecordId,
                            LoggedInUserId = record.LoggedInUserId
                        });
                    }
                }
            }
            catch (Exception ex)
            {
    
                throw ex;
            }
            return response;
        }
    }

配置客户端:
services.AddMassTransit(config =\> { config.AddConsumer\<GetReleasedCountOfAnArtistConsumer\>(); config.UsingRabbitMq((ctx, cfg) =\> { cfg.Host("amqp://guest:guest@localhost:5672"); cfg.ReceiveEndpoint("xxx-yyy-queue", c =\> { c.AutoStart = true; c.AutoDelete = true; c.ConfigureConsumer\<GetReleasedCountOfAnArtistConsumer\>(ctx); }); }); config.AddRequestClient\<GetReleasedCountOfAnArtist\>(); });
配置消费者:

{
config.AddConsumer\<GetUserDetailsConsumer\>();
config.UsingRabbitMq((ctx, cfg) =\>
{
cfg.Host("amqp://guest:guest@localhost:5672");
cfg.PrefetchCount = 32;
cfg.ReceiveEndpoint("xxx-zzz-queue", c =\>
{
c.AutoStart = true;
c.AutoDelete = true;
c.UseMessageRetry(r =\> r.Immediate(5));
c.ConfigureConsumer\<GetUserDetailsConsumer\>(ctx);
});
});
config.AddRequestClient\<GetUserDetails\>(TimeSpan.FromSeconds(30));``your text``
});```
enxuqcxy

enxuqcxy1#

我不确定这是否是问题所在,但您希望客户端代码返回GetUserDetailsResult,而您的消费者代码有时会返回string(在if (userDetails.Count == 0)条件下)

相关问题