当从Masstransit 7. 3升级到Masstransit 8. 0. 15时,我发现了一个似乎是突破性的变化,我想知道以前是否还有人遇到过这个问题。
我有一个 Saga 计划每30秒重新启动一次作业:
Schedule(() => RestartConversion, instance => instance.RestartConversionTokenId, s =>
{
s.Delay = TimeSpan.FromSeconds(30);
s.Received = r => r.CorrelateById(context => context.Message.ItemId);
});
然而,在我的单元测试中,在以前的7.3版中,我能够通过发布RestartConversionEvent
手动触发此事件,这将在When(RestartConversion.Received)
中触发。
var endPoint = await Harness.Bus.GetSendEndpoint(Harness.InputQueueAddress);
await endPoint.Send(new DocumentConversionSaga.RestartConversionSchedule
{
ItemId = Guid.Parse(uploadDocumentInfo.ItemId),
ConversionJobInstanceId = conversionInstanceId,
PreviewJobInstanceId = previewInstanceId
});
这种行为在Masstransit 8中似乎不再起作用,因为如果我等待30秒,我可以看到RestartConversion
事件被自动发送。是否有解决此问题的方法?
线束配置:
.AddMassTransitInMemoryTestHarness(cfg =>
{
cfg.SetInMemorySagaRepositoryProvider();
cfg.AddSagaStateMachine<DocumentConversionSaga, DocumentConversionState>().InMemoryRepository();
cfg.AddRequestClient<StartConversionCommand>();
})
Harness = provider.GetRequiredService<InMemoryTestHarness>();
Harness.OnConfigureBus += cfg =>
{
cfg.UseDelayedMessageScheduler();
BusTestFixture.ConfigureBusDiagnostics(cfg);
cfg.UseMessageData(new InMemoryMessageDataRepository());
cfg.UseNewtonsoftJsonSerializer();
};
我尝试更改内存中的测试工具以使用以下内容,但结果完全相同。
.AddMassTransitTestHarness(cfg =>
{
cfg.SetInMemorySagaRepositoryProvider();
cfg.AddSagaStateMachine<DocumentConversionSaga, DocumentConversionState>().InMemoryRepository();
cfg.AddRequestClient<StartConversionCommand>();
cfg.UsingInMemory((ctx, x) =>
{
x.UseDelayedMessageScheduler();
BusTestFixture.ConfigureBusDiagnostics(x);
x.UseMessageData(new InMemoryMessageDataRepository());
x.UseNewtonsoftJsonSerializer();
x.ConfigureEndpoints(ctx);
});
})
1条答案
按热度按时间nimxete21#
您应该始终查看日志,以准确了解在封面下发生了什么,但如果我不得不猜测,您正在向 Saga 发送一个未预期的事件。如果 Saga 没有调度消息(并将调度的消息tokenId保留在saga示例中),它将忽略该消息,因为它不是当前消息。这是为了避免“我取消了对消息的调度,但它还是到达了”的场景,该场景是调度的消息与消息调度器的未决取消之间的竞争条件。
要使其工作,您可能必须将tokenId存储在 Saga 示例中,并发送包含该tokenId的“预定”事件。