rabbitmq 状态机转换

thtygnil  于 2023-08-05  发布在  RabbitMQ
关注(0)|答案(1)|浏览(185)

所以,我一定是漏掉了什么,在我看来,即使事件没有发生,我的状态机也会阻止执行。这是我的状态机:

public class OrderStateMachine : MassTransitStateMachine<NormalScanState>
{
    public OrderStateMachine()
    {
        Event(() => OrderScannedEvent, x => x.CorrelateById(context => context.Message.CorrelationId));
        Event(() => OrderDataRetrieved,
            x => x.CorrelateById(context => context.Message.CorrelationId));
        Event(() => SortingGeneratedEvent,
            x => x.CorrelateById(context => new Guid(context.GetHeader("correlation-id")!)));

        InstanceState(x => x.CurrentState);

        Initially(
            When(OrderScannedEvent)
                .Then(context =>
                {
                    context.Saga.CorrelationId = context.Message.CorrelationId;
                    context.Saga.OrderId = context.Message.OrderId;
                    context.Saga.OrderData.TrackingNumber = context.Message.TrackingNumber;
                })
                .Activity(x => x.OfType<GetOrderActivity>())
                .TransitionTo(GetOrderSubmitted));

        During(GetOrderSubmitted,
            Ignore(OrderScannedEvent),
            Ignore(SortingGeneratedEvent),
            When(OrderDataRetrieved)
                .Publish(context => new GenerateSorting
                    {
                        Height = context.Saga.OrderData.Height,
                        Length = context.Saga.OrderData.Length,
                    }
                    , AddHeaders)
                .TransitionTo(GenerateSortingSubmitted));

        During(GenerateSortingSubmitted,
            Ignore(OrderScannedEvent),
            Ignore(OrderDataRetrieved),
            When(SortingGeneratedEvent)
                .Then(context =>
                {
                    context.Saga.OrderData.SortCode = context.Message.SortCode;
                    context.Saga.OrderData.RouteCode = context.Message.RouteCode;
                    context.Saga.OrderData.Carrier = context.Message.Carrier;
                })              
                .TransitionTo(Accepted));

    }

    public State GetOrderSubmitted { get; private set; }
    public State GenerateSortingSubmitted { get; private set; }  
    public State Accepted { get; private set; }

    public Event<OrderSubmitted>? OrderSubmittedEvent { get; private set; }
    public Event<OrderDataRetrieved> OrderDataRetrieved { get; private set; }
    public Event<SortingGenerated> SortingGeneratedEvent { get; private set; }

    private void AddHeaders(PublishContext publishContext)
    {
        var consumeContext = publishContext.GetPayload<ConsumeContext>();
        var correlationId = consumeContext.CorrelationId;
        publishContext.Headers.Set("correlation-id", correlationId);
    }
}

字符串
即使OrderDataRetrieved和GenerateSortingSubmitted事件没有发生,During(GetOrderSubmitted)和During(GenerateSortingSubmitted)中的代码也会被执行。
这是我的安装程序:

public void InstallServices(IServiceCollection services, IConfiguration configuration)
    {
        services.AddMassTransit(cfg =>
        {
            var entryAssembly = Assembly.GetEntryAssembly();
            
            cfg.SetKebabCaseEndpointNameFormatter();
            cfg.SetInMemorySagaRepositoryProvider();
   
            cfg.AddConsumers(entryAssembly);
            cfg.AddSagaStateMachines(entryAssembly);
            cfg.AddSagas(entryAssembly);
            cfg.AddActivities(entryAssembly);   
            
            cfg.UsingRabbitMq((context, configurator) =>
            {
                configurator.Host(queueConfig.ConnectionString);
            

                configurator.DeployPublishTopology = true;

                configurator.ClearSerialization();
                configurator.UseRawJsonSerializer(RawSerializerOptions.AnyMessageType);

                configurator.ConfigureEndpoints(context);
            });
        });
    }


我错过了什么?

niknxzdl

niknxzdl1#

第一个月
这不适用于使用多个事件类型的 Saga ,因为它将把每个入站消息都视为与状态机中的每个事件匹配的消息。
文档介绍了原始JSON的工作原理。

相关问题