rabbitmq 在MassTransit中未触发复合事件

vq8itlhq  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(1)|浏览(155)

我的问题是,当我获得所有必需的事件时,组合事件未被触发(在我的例子中是LinkContractLinkCustomer)。还有一件奇怪的事情-当我发送LinkContract事件时,ProcessingStatus的值变成了1,然后当我收到第二个事件LinkCustomer时,它变成了0,但我认为它一定是3。有人能解释一下为什么会发生这种情况,而我的复合事件没有被触发吗?
我已经在MassTransitStateMachine中注册了复合事件:

using MassTransit;
using Sandbox.WebApi.Models.StateEvents;
using Serilog;

namespace Sandbox.WebApi.StateMachines;

public class BillingStateMachine : MassTransitStateMachine<BillingState>
{
    public BillingStateMachine()
    {
        InstanceState(x => x.CurrentState);

        Event(() => LinkCustomer, e => e
            .CorrelateById(x => x.Message.CustomerId));
        Event(() => LinkContract, e => e
            .CorrelateById(x => x.Message.CustomerId));

        Initially(
            When(LinkCustomer)
                .Then(HandleLinkCustomer)
                .TransitionTo(Processing),
            When(LinkContract)
                .Then(HandleLinkContract)
                .TransitionTo(Processing));

        During(Processing,
            When(LinkContract)
                .Then(HandleLinkContract),
            When(LinkCustomer)
                .Then(HandleLinkCustomer));

        CompositeEvent(() => PaymentSucceeded, 
            x => x.ProcessingStatus,
            CompositeEventOptions.IncludeInitial,
            LinkContract, LinkCustomer);

        During(Processing,
            When(PaymentSucceeded)
                .Then(HandlePaymentSucceeded)
                .TransitionTo(Paid));
    }

    public State Processing { get; }
    public State Paid { get; }

    public Event PaymentSucceeded { get; }
    public Event<ContractCreated> LinkContract { get; }
    public Event<CustomerCreated> LinkCustomer { get; }

    private static void HandleLinkCustomer(BehaviorContext<BillingState, CustomerCreated> context)
    {
        context.Saga.CustomerId = context.Message.CustomerId;
        Log.Information("Customer was linked: {CustomerId}", context.Saga.CustomerId);
    }

    private static void HandleLinkContract(BehaviorContext<BillingState, ContractCreated> context)
    {
        context.Saga.ContractId = context.Message.ContractId;
        context.Saga.CustomerId = context.Message.CustomerId;
        Log.Information("Contract was linked: {ContractId}", context.Saga.ContractId);
    }

    private static void HandlePaymentSucceeded(BehaviorContext<BillingState> context)
    {
        Log.Information("Billing was paid: {ContractId} & {CustomerId}",
            context.Saga.ContractId, context.Saga.CustomerId);
    }
}

和以下状态:

using MassTransit;

namespace Sandbox.WebApi.StateMachines;

public class BillingState : SagaStateMachineInstance, ISagaVersion
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }
    public CompositeEventStatus ProcessingStatus { get; set; }
    public int Version { get; set; }

    public Guid CustomerId { get; set; }
    public Guid ContractId { get; set; }
}

有DI配置:

services.AddMassTransit(opts =>
{
    opts.SetEndpointNameFormatter(KebabCaseEndpointNameFormatter.Instance);

    opts.AddConsumers(assembly);
    opts.AddSagaStateMachine<BillingStateMachine, BillingState>()
        .RedisRepository(x => x
            .DatabaseConfiguration("<connection_string>"));

    opts.UsingRabbitMq((ctx, cfg) =>
    {
        cfg.ConfigureEndpoints(ctx);
        cfg.AutoStart = true;
        cfg.Host("localhost", 5672, "/", r =>
        {
            r.Username("guest");
            r.Password("guest");
        });
    });
});

另外,如果这将有所帮助,还有日志:

[13:04:29 INF] Bus started: rabbitmq://localhost/
[13:05:16 DBG] Declare exchange: name: Sandbox.WebApi.Models.StateEvents:ContractCreated, type: fanout, durable
[13:05:16 DBG] SEND rabbitmq://localhost/Sandbox.WebApi.Models.StateEvents:ContractCreated eb480000-25c5-9e68-3b62-08da536d8a2f Sandbox.WebApi.Models.StateEvents.ContractCreated
[13:05:16 INF] HTTP POST /api/User/contract responded 204 in 322.4377 ms
[13:05:16 DBG] SAGA:Sandbox.WebApi.StateMachines.BillingState:00000000-507f-1f77-bcf8-6cd799439011 Created Sandbox.WebApi.Models.StateEvents.ContractCreated
[13:05:16 DBG] SAGA:Sandbox.WebApi.StateMachines.BillingState:00000000-507f-1f77-bcf8-6cd799439011 Added Sandbox.WebApi.Models.StateEvents.ContractCreated
[13:05:16 INF] Contract was linked: 00000000-507f-1f77-bcf8-6cd799439011
[13:05:16 DBG] RECEIVE rabbitmq://localhost/billing-state eb480000-25c5-9e68-3b62-08da536d8a2f Sandbox.WebApi.Models.StateEvents.ContractCreated Sandbox.WebApi.StateMachines.BillingState(00:00:00.4622311)
[13:06:29 DBG] Declare exchange: name: Sandbox.WebApi.Models.StateEvents:CustomerCreated, type: fanout, durable
[13:06:29 DBG] SEND rabbitmq://localhost/Sandbox.WebApi.Models.StateEvents:CustomerCreated eb480000-25c5-9e68-751a-08da536db5cb Sandbox.WebApi.Models.StateEvents.CustomerCreated
[13:06:29 INF] HTTP POST /api/User/customer responded 204 in 360.3700 ms
[13:06:29 DBG] SAGA:Sandbox.WebApi.StateMachines.BillingState:00000000-507f-1f77-bcf8-6cd799439011 Used Sandbox.WebApi.Models.StateEvents.CustomerCreated
[13:06:29 INF] Customer was linked: 00000000-507f-1f77-bcf8-6cd799439011
[13:06:29 DBG] RECEIVE rabbitmq://localhost/billing-state eb480000-25c5-9e68-751a-08da536db5cb Sandbox.WebApi.Models.StateEvents.CustomerCreated Sandbox.WebApi.StateMachines.BillingState(00:00:00.3579677)
hts6caw3

hts6caw31#

猜猜看,是不是Redis的某个奇怪的序列化问题?将复合事件状态更改为int
public int ProcessingStatus { get; set; }
此外,请确保在saga上配置了retry,以便重试任何并发问题。

相关问题