rabbitmq 当MassTransit状态机请求返回Request TimeoutExpired时,如何避免将消息保留在端点队列中

oug3syen  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(1)|浏览(169)

我的应用程序中有一个特定的场景,即当新用户创建新帐户时,我需要为他创建新工作区。在我的场景中,我还必须等待工作区创建完成,才能向客户端返回成功消息,告知其帐户已创建,工作区已准备就绪
因此,我使用了MassTransit Request方法来实现我所需要的功能,它的工作非常完美,正如您所看到的,我正在使用此请求启动我的状态机

var response = await _prepareAdvertiserAccountRequest.GetResponse<AdvertiserAccountPreparedIE, AdvertiserAccountPreparationFailedIE>(new PrepareAdvertiserAccountIE
            {
                Owner = notification.Owner,
                FirstName = notification.FirstName,
                LastName = notification.LastName,
            });

这是我的状态机,我正在保存ResponseAddress,然后请求发送消息,所有内容都以字母形式工作,请求正在等待创建工作空间,然后结束请求

using MassTransit;
using EventBus.Messages.Events.Identity.AdvertiserSignUpSagaEvents;

namespace Auth.API.StateMachines.AdvertiserAccount
{
    public class AdvertiserAccountStateMachine :
        MassTransitStateMachine<AdvertiserAccountState>
    {
        public AdvertiserAccountStateMachine()
        {
            InstanceState(x => x.CurrentState);

            Event(() => PrepareAdvertiserAccount, x => x.CorrelateById(m => m.Message.Owner));

            Request(() => CreateWorkspaceRequested, x => x.CreateWorkspaceRequestId, x =>
            {
                x.ServiceAddress = new Uri($"queue:create-workspace");
                x.Timeout = TimeSpan.FromSeconds(20);
            });

            Initially(
                When(PrepareAdvertiserAccount)
                .Then(context =>
                {
                    context.Saga.CreateWorkspaceRequestId = Guid.NewGuid();

                    context.Saga.AccountId = context.Message.Owner;
                    context.Saga.FirstName = context.Message.FirstName;
                    context.Saga.LastName = context.Message.LastName;

                    context.Saga.ResponseAddress = context.ResponseAddress!;
                    context.Saga.RequestId = context.RequestId;
                })
                .Request(CreateWorkspaceRequested, x => x.Init<CreateWorkspaceIE>(new CreateWorkspaceIE
                {
                    Owner = x.Saga.AccountId,
                    FirstName = x.Saga.FirstName,
                    LastName = x.Saga.LastName,
                }))
                .TransitionTo(CreateWorkspaceRequested.Pending));

            During(CreateWorkspaceRequested.Pending,
                When(CreateWorkspaceRequested.Completed)
                    .Then(context =>
                    {
                        context.Saga.WorkspaceId = context.Message.WorkspaceId;
                    })
                    .TransitionTo(PreparationCompleted),
                When(CreateWorkspaceRequested.Faulted)
                    .TransitionTo(PreparationFailed),
                When(CreateWorkspaceRequested.TimeoutExpired)
                    .TransitionTo(PreparationFailed));

            WhenEnter(PreparationFailed, x => x.ThenAsync(async context =>
            {
                var endpoint = await context.GetSendEndpoint(context.Saga.ResponseAddress);
                await endpoint.Send(new AdvertiserAccountPreparationFailedIE(), r => r.RequestId = context.Saga.RequestId);
            }));
            WhenEnter(PreparationCompleted, x => x.ThenAsync(async context =>
            {
                var endpoint = await context.GetSendEndpoint(context.Saga.ResponseAddress);
                await endpoint.Send(new AdvertiserAccountPreparedIE(), r => r.RequestId = context.Saga.RequestId);
            }).Finalize());

            SetCompletedWhenFinalized();
        }

        public State PreparationFailed { get; set; }
        public State PreparationCompleted { get; set; }

        public State WorkspaceCreated { get; set; }

        public Event<PrepareAdvertiserAccountIE> PrepareAdvertiserAccount { get; set; }
        public Request<AdvertiserAccountState, CreateWorkspaceIE, WorkspaceCreatedIE> CreateWorkspaceRequested { get; set; }
    }
}

但我的问题是,当Workspace Service Down(create-workspace)队列未被使用时,请求将返回CreateWorkspaceRequested.TimeoutExpired,然后状态机将结束请求并将AdvertiserAccountPreparationFailedIE事件返回到请求客户端(我们启动所述机器的客户端)
但在CreateWorkspaceRequested.TimeoutExpired之后,queue:create-workspace (the serviceAddress that State Machine Request Sending to )会在工作区服务可用时将消息保持为准备使用的状态,我不需要发生这种情况,因为当返回CreateWorkspaceRequested.TimeoutExpired时,我将回滚创建帐户,因此不再需要创建工作区

a6b3iqyw

a6b3iqyw1#

如果将请求初始值设定项更改为:

.Request(CreateWorkspaceRequested, x => x.Init<CreateWorkspaceIE>(new
{
    Owner = x.Saga.AccountId,
    FirstName = x.Saga.FirstName,
    LastName = x.Saga.LastName,
    __TimeToLive = TimeSpan.FromSeconds(30)
}))

将在传出邮件上设置TimeToLive标头。

相关问题