我在. Net核心Web API中使用Mass Transit(8.0.6),其中一条备用消息总是会跳过队列并因超时异常而失败。1个请求通过,下一个失败。
异常:在System. Runtime. ExceptionServices. ExceptionDispatchInfo. Throw()at System. Runtime. CompilerServices. TaskAwaiter. ThrowForNonSuccess(Task任务)at System. Runtime. CompilerServices. TaskAwaiter. HandleNonSuccessAndDebuggerNotification(Task任务)at System. Runtime. CompilerServices. ConfiguredTaskWaitable 1.ConfiguredTaskAwaiter.GetResult() at MassTransit.Clients.ResponseHandlerConnectHandle
1.d__11.MoveNext()in/_/src/MassTransit/Handle/ResponseHandlerCompiletClients. cs:第55行at System. Runtime. ExceptionServices. ExceptionDispatchInfo. Runtime()at System. Runtime. CompilerServices. TaskAwaiter. ThrowForNonSuccess(Task任务)at System. Runtime. CompilerServices. TaskAwaiter. HandleNonSuccessAndDebuggerNotification(Task任务)at System. Runtime. CompilerServices. ConfiguredTaskAwaitable 1.ConfiguredTaskAwaiter.GetResult() at MassTransit.Clients.RequestClient
{
try
{
var userDetails = new List\<UserDetail\>();
var columns = new List\<ColumnDetails\>() { new ColumnDetails() { ColumnName = "UserId", DataType = typeof(string) } };
var dataTable = DataTableExtension.CreateDataTable<User>(context.Message.UserIds, columns);
string[] paramName = { "@UserIds", "@FilePath" };
var data = this._unitOfWork.GetDataFromStoredProcedure("[rabbitmq].[GetUserDetails]", paramName, dataTable, context.Message.FilePath);
if (data != null && data.Rows.Count > 0)
{
for (int i = 0; i < data.Rows.Count; i++)
{
var user = new UserDetail()
{
UserImage = Convert.ToString(data.Rows[i]["UserImage"]),
FullName = Convert.ToString(data.Rows[i]["FullName"]),
UserId = Convert.ToString(data.Rows[i]["UserId"])
};
userDetails.Add(user);
}
}
if (userDetails.Count == 0)
await context.RespondAsync<string>("No data found.");
else
await context.RespondAsync<GetUserDetailsResult>(new
{
UserDetails = userDetails
});
}
catch (Exception ex)
{
throw ex;
}
}
客户代码:
{
private readonly IRequestClient\<GetUserDetails\> \_client1;
private readonly IRequestClient\<GetRecordTokenDetails\> \_client2;
public GetUserAndTokenDetailsRMQHandler(IRequestClient\<GetUserDetails\> client1, IRequestClient\<GetRecordTokenDetails\> client2)
{
this.client1 = client1;
this.client2 = client2;
}
public async Task<GetUserAndTokenDetailsResponse> Handle(GetUserAndTokenDetailsRequest request, CancellationToken cancellationToken)
{
var response = new GetUserAndTokenDetailsResponse() { UserDetails = new List<UserDetail>(), RecordTokenDetailsResult = new List<TokensDetail>() };
try
{
if (request.UserIds?.Count > 0)
{
//2.auth consumer
var result1 = await _client1.GetResponse<GetUserDetailsResult>(new
{
Filepath = request.FilePath,
UserIds = request.UserIds
});
foreach (var user in result1.Message.UserDetails)
{
response.UserDetails.Add(new UserDetail() { FullName = user.FullName, UserImage = user.UserImage, UserId = user.UserId });
}
}
//3.payment consumer
if (request.RecordTokenDetails != null)
{
var result2 = await _client2.GetResponse<GetRecordTokenDetailsResult>(new
{
RecordTokenDetails = request.RecordTokenDetails
});
foreach (var record in result2.Message.RecordTokenDetailsResult)
{
response.RecordTokenDetailsResult.Add(new TokensDetail()
{
SharesAvailable = record.SharesAvailable,
TotalPendingInvestment = record.TotalPendingInvestment,
MyStatusOnRecord = record.MyStatusOnRecord,
MyStatusTypeOnRecord = record.MyStatusTypeOnRecord,
ArtistId = record.ArtistId,
RecordId = record.RecordId,
LoggedInUserId = record.LoggedInUserId
});
}
}
}
catch (Exception ex)
{
throw ex;
}
return response;
}
}
配置客户端:services.AddMassTransit(config =\> { config.AddConsumer\<GetReleasedCountOfAnArtistConsumer\>(); config.UsingRabbitMq((ctx, cfg) =\> { cfg.Host("amqp://guest:guest@localhost:5672"); cfg.ReceiveEndpoint("xxx-yyy-queue", c =\> { c.AutoStart = true; c.AutoDelete = true; c.ConfigureConsumer\<GetReleasedCountOfAnArtistConsumer\>(ctx); }); }); config.AddRequestClient\<GetReleasedCountOfAnArtist\>(); });
配置消费者:
{
config.AddConsumer\<GetUserDetailsConsumer\>();
config.UsingRabbitMq((ctx, cfg) =\>
{
cfg.Host("amqp://guest:guest@localhost:5672");
cfg.PrefetchCount = 32;
cfg.ReceiveEndpoint("xxx-zzz-queue", c =\>
{
c.AutoStart = true;
c.AutoDelete = true;
c.UseMessageRetry(r =\> r.Immediate(5));
c.ConfigureConsumer\<GetUserDetailsConsumer\>(ctx);
});
});
config.AddRequestClient\<GetUserDetails\>(TimeSpan.FromSeconds(30));``your text``
});```
1条答案
按热度按时间enxuqcxy1#
我不确定这是否是问题所在,但您希望客户端代码返回
GetUserDetailsResult
,而您的消费者代码有时会返回string
(在if (userDetails.Count == 0)
条件下)