.net 在MassTransit 8 Saga 中触发预定事件

bqujaahr  于 2023-05-30  发布在  .NET
关注(0)|答案(1)|浏览(155)

当从Masstransit 7. 3升级到Masstransit 8. 0. 15时,我发现了一个似乎是突破性的变化,我想知道以前是否还有人遇到过这个问题。
我有一个 Saga 计划每30秒重新启动一次作业:

Schedule(() => RestartConversion, instance => instance.RestartConversionTokenId, s =>
{
    s.Delay = TimeSpan.FromSeconds(30);
    s.Received = r => r.CorrelateById(context => context.Message.ItemId);
});

然而,在我的单元测试中,在以前的7.3版中,我能够通过发布RestartConversionEvent手动触发此事件,这将在When(RestartConversion.Received)中触发。

var endPoint = await Harness.Bus.GetSendEndpoint(Harness.InputQueueAddress);
await endPoint.Send(new DocumentConversionSaga.RestartConversionSchedule
{
    ItemId = Guid.Parse(uploadDocumentInfo.ItemId),
    ConversionJobInstanceId = conversionInstanceId,
    PreviewJobInstanceId = previewInstanceId
});

这种行为在Masstransit 8中似乎不再起作用,因为如果我等待30秒,我可以看到RestartConversion事件被自动发送。是否有解决此问题的方法?
线束配置:

.AddMassTransitInMemoryTestHarness(cfg =>
{
    cfg.SetInMemorySagaRepositoryProvider();
    cfg.AddSagaStateMachine<DocumentConversionSaga, DocumentConversionState>().InMemoryRepository();
    cfg.AddRequestClient<StartConversionCommand>();
})

Harness = provider.GetRequiredService<InMemoryTestHarness>();
Harness.OnConfigureBus += cfg =>
{
    cfg.UseDelayedMessageScheduler();
    BusTestFixture.ConfigureBusDiagnostics(cfg);
    cfg.UseMessageData(new InMemoryMessageDataRepository());
    cfg.UseNewtonsoftJsonSerializer();
};

我尝试更改内存中的测试工具以使用以下内容,但结果完全相同。

.AddMassTransitTestHarness(cfg =>
{
    cfg.SetInMemorySagaRepositoryProvider();
    cfg.AddSagaStateMachine<DocumentConversionSaga, DocumentConversionState>().InMemoryRepository();
    cfg.AddRequestClient<StartConversionCommand>();
    cfg.UsingInMemory((ctx, x) =>
    {
        x.UseDelayedMessageScheduler();
        BusTestFixture.ConfigureBusDiagnostics(x);
        x.UseMessageData(new InMemoryMessageDataRepository());
        x.UseNewtonsoftJsonSerializer();
        x.ConfigureEndpoints(ctx);
    });
})
nimxete2

nimxete21#

您应该始终查看日志,以准确了解在封面下发生了什么,但如果我不得不猜测,您正在向 Saga 发送一个未预期的事件。如果 Saga 没有调度消息(并将调度的消息tokenId保留在saga示例中),它将忽略该消息,因为它不是当前消息。这是为了避免“我取消了对消息的调度,但它还是到达了”的场景,该场景是调度的消息与消息调度器的未决取消之间的竞争条件。
要使其工作,您可能必须将tokenId存储在 Saga 示例中,并发送包含该tokenId的“预定”事件。

相关问题