rabbitmq MassTransit事务发件箱是否适用于多总线方案

muk1a3rh  于 2022-12-04  发布在  RabbitMQ
关注(0)|答案(1)|浏览(177)

有一种情况是,我有单独的***CommandBus***和***EventBus***,我有一个使用者,它侦听命令总线中的命令,并在处理操作后将相关事件发布到***Event Bus***。我希望在***EventBus***上拥有对事务发件箱模式的内置支持。
Here is the link to a repo
以下是应用程序的配置:

public static void ConfigureServices(HostBuilderContext host, IServiceCollection services)
    {
        services.Configure<MessageBrokerConfiguration>(host.Configuration.GetSection("MessageBroker"));
        var brokerConfiguration = new MessageBrokerConfiguration();
        host.Configuration.Bind("MessageBroker", brokerConfiguration);
        
        services.AddHostedService<DatabaseMigratorHostedService>();
        
        services.AddMassTransit<ICommandBus>(mt =>
        {
            mt.UsingRabbitMq((context, configurator) =>
            {
                configurator.Host(brokerConfiguration.CommandBus);
                configurator.ConfigureEndpoints(context);
            });

            mt.AddConsumersFromNamespaceContaining<CreateOrderConsumer>();
        });
        
        services.AddMassTransit(mt =>
        {
            mt.AddEntityFrameworkOutbox<OrderContext>(options =>
            {
                options.QueryDelay = TimeSpan.FromSeconds(1);
                options.UsePostgres();
                options.UseBusOutbox();
            });
            
            mt.UsingRabbitMq((context, configurator) =>
            {
                configurator.Host(brokerConfiguration.EventBus);
                configurator.ConfigureEndpoints(context);
            });
        });

        services.AddRepositories(host.Configuration);
        services.AddScoped<IEventEmitter, MasstransitEventEmitter>();
    }

下面是我的消费者,它在一条总线上侦听命令,并向另一条总线发布事件:

public sealed class CreateOrderConsumer
    : IConsumer<CreateOrder>
{
    private readonly IEventEmitter _eventEmitter;
    private readonly IUnitOfWork _unitOfWork;
    private readonly IRepository<Order> _repository;

    public CreateOrderConsumer(
        IRepository<Order> repository,
        IUnitOfWork unitOfWork,
        IEventEmitter eventEmitter)
    {
        _unitOfWork = Guard.Against.Null(unitOfWork);
        _repository = Guard.Against.Null(repository);
        _eventEmitter = Guard.Against.Null(eventEmitter);
    }

    public async Task Consume(ConsumeContext<CreateOrder> context)
    {
        var order = new Order(context.Message.ProductId, context.Message.Quantity);
        
        await _repository.StoreAsync(order);
        
        await _eventEmitter.Emit(order.DomainEvents);
        order.ClearDomainEvents();
        
        await _unitOfWork.CommitAsync();
        await context.RespondAsync<CreateOrderResult>(new { OrderId = order.Id });
    }

并且我的IEventEmitter正在获得IBus:

public sealed class MasstransitEventEmitter : IEventEmitter
{
    private readonly IPublishEndpoint _publishEndpoint;

    public MasstransitEventEmitter(IBus publishEndpoint)
    {
        _publishEndpoint = Guard.Against.Null(publishEndpoint);
    }
    
    public async Task Emit(IEnumerable<IDomainEvent> domainEvents)
    {
        try
        {
            foreach (var domainEvent in domainEvents)
            {
                await _publishEndpoint.Publish(domainEvent, domainEvent.GetType(), CancellationToken.None);
            }
        }
        catch (Exception)
        {
            // ignored
        }
    }
}

下面是用于业务逻辑和事务发件箱配置的DbContext:

public sealed class OrderContext : DbContext, IUnitOfWork
{
    public OrderContext(DbContextOptions<OrderContext> options) : base(options)
    {
    }

    internal DbSet<OrderEntity> Orders { get; private set; } = default!;

    public async Task CommitAsync(CancellationToken cancellationToken = default)
        => await this.SaveChangesAsync(cancellationToken);

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        base.OnModelCreating(modelBuilder);
        
        modelBuilder.ApplyConfiguration(new OrderEntityConfiguration());
        
        modelBuilder.AddInboxStateEntity();
        modelBuilder.AddOutboxMessageEntity();
        modelBuilder.AddOutboxStateEntity();
    }
}

API层通过IRequestClient<CreateOrder>向命令总线发送命令,并等待返回响应。问题是,当事件总线(而不是命令总线)出现故障时,事务发件箱无法工作,并且会一直持续到发生超时异常。

[HttpPost]
    public async Task<IActionResult> Post(
        [FromBody] CreateOrderDto createOrderDto,
        [FromServices] IRequestClient<CreateOrder> createOrderRequestClient)
    {
        var result = await createOrderRequestClient.GetResponse<CreateOrderResult>(
            new CreateOrder{ ProductId = createOrderDto.ProductId,Quantity= createOrderDto.Quantity }, 
            timeout: RequestTimeout.After(m:2));
        return Ok(result);
    }

以及API的日志:

info: MassTransit[0]
      Bus started: rabbitmq://localhost/
info: Microsoft.Hosting.Lifetime[14]
      Now listening on: https://localhost:7129
info: Microsoft.Hosting.Lifetime[14]
      Now listening on: http://localhost:5134
info: Microsoft.Hosting.Lifetime[0]
      Application started. Press Ctrl+C to shut down.
info: Microsoft.Hosting.Lifetime[0]
      Hosting environment: Development
info: Microsoft.Hosting.Lifetime[0]
      Content root path: /Users/shahab/dev/talks/Demo.TransactionalOutbox/Demo.TransactionalOutbox.Api

应用层日志(监听命令并发布事件):

[13:33:45 INF] Configured endpoint CancelOrder, Consumer: Demo.TransactionalOutbox.Application.Consumers.CancelOrderConsumer
[13:33:45 INF] Configured endpoint CreateOrder, Consumer: Demo.TransactionalOutbox.Application.Consumers.CreateOrderConsumer
[13:33:45 INF] Configured endpoint GetOrder, Consumer: Demo.TransactionalOutbox.Application.Consumers.GetOrderConsumer
[13:33:49 DBG] Starting bus instances: ICommandBus, IBus
[13:33:49 DBG] Starting bus: rabbitmq://localhost/
[13:33:49 DBG] Starting bus: rabbitmq://localhost:6666/
[13:33:49 DBG] Connect: guest@localhost:5672/
[13:33:49 DBG] Connect: guest@localhost:6666/
[13:33:49 DBG] Connected: guest@localhost:5672/ (address: amqp://localhost:5672, local: 49955)
[13:33:49 DBG] Connected: guest@localhost:6666/ (address: amqp://localhost:6666, local: 49954)
[13:33:50 DBG] Endpoint Ready: rabbitmq://localhost/McShahab_DemoTransactio_bus_5emoyydyan1f7qhobdppkkw6gp?temporary=true
[13:33:50 DBG] Endpoint Ready: rabbitmq://localhost:6666/McShahab_DemoTransactio_bus_5emoyydyan1f7jiabdppkkw9bz?temporary=true
[13:33:50 INF] Bus started: rabbitmq://localhost:6666/
[13:33:50 DBG] Declare queue: name: CancelOrder, durable, consumer-count: 0 message-count: 0
[13:33:50 DBG] Declare queue: name: CreateOrder, durable, consumer-count: 0 message-count: 0
[13:33:50 DBG] Declare queue: name: GetOrder, durable, consumer-count: 0 message-count: 0
[13:33:50 DBG] Declare exchange: name: GetOrder, type: fanout, durable
[13:33:50 DBG] Declare exchange: name: CreateOrder, type: fanout, durable
[13:33:50 DBG] Declare exchange: name: CancelOrder, type: fanout, durable
[13:33:50 DBG] Declare exchange: name: Demo.TransactionalOutbox.Domain.OrderAggregate.Queries:GetOrderStatus, type: fanout, durable
[13:33:50 DBG] Declare exchange: name: Demo.TransactionalOutbox.Domain.OrderAggregate.Commands:CancelOrder, type: fanout, durable
[13:33:50 DBG] Declare exchange: name: Demo.TransactionalOutbox.Domain.OrderAggregate.Commands:CreateOrder, type: fanout, durable
[13:33:50 DBG] Bind queue: source: GetOrder, destination: GetOrder
[13:33:50 DBG] Bind queue: source: CancelOrder, destination: CancelOrder
[13:33:50 DBG] Bind queue: source: CreateOrder, destination: CreateOrder
[13:33:50 DBG] Bind exchange: source: Demo.TransactionalOutbox.Domain.OrderAggregate.Commands:CreateOrder, destination: CreateOrder
[13:33:50 DBG] Bind exchange: source: Demo.TransactionalOutbox.Domain.OrderAggregate.Queries:GetOrderStatus, destination: GetOrder
[13:33:50 DBG] Bind exchange: source: Demo.TransactionalOutbox.Domain.OrderAggregate.Commands:CancelOrder, destination: CancelOrder
[13:33:50 DBG] Consumer Ok: rabbitmq://localhost/GetOrder - amq.ctag-jT06Ly0B8--gYF2XxxxyGQ
[13:33:50 DBG] Consumer Ok: rabbitmq://localhost/CreateOrder - amq.ctag-K2-6Gcdxk8z6UPxI0q-xQw
[13:33:50 DBG] Consumer Ok: rabbitmq://localhost/CancelOrder - amq.ctag-YRlkqWCWLKPX1JpCtEThJQ
[13:33:50 DBG] Endpoint Ready: rabbitmq://localhost/CreateOrder
[13:33:50 DBG] Endpoint Ready: rabbitmq://localhost/GetOrder
[13:33:50 INF] Bus started: rabbitmq://localhost/
[13:33:50 DBG] Endpoint Ready: rabbitmq://localhost/CancelOrder

和事件的消费者:

[13:33:47 INF] Configured endpoint OrderCreated, Consumer: Demo.TransactionalOutbox.FancyConsumer.OrderCreatedConsumer
[13:33:48 DBG] Starting bus instances: IBus
[13:33:48 DBG] Starting bus: rabbitmq://localhost:6666/
[13:33:48 DBG] Connect: guest@localhost:6666/
[13:33:48 DBG] Connected: guest@localhost:6666/ (address: amqp://localhost:6666, local: 49947)
[13:33:48 DBG] Endpoint Ready: rabbitmq://localhost:6666/McShahab_DemoTransactio_bus_hrmoyydyan1fh45qbdppkkiyy5?temporary=true
[13:33:48 DBG] Declare queue: name: OrderCreated, durable, consumer-count: 0 message-count: 0
[13:33:48 DBG] Declare exchange: name: OrderCreated, type: fanout, durable
[13:33:48 DBG] Declare exchange: name: Demo.TransactionalOutbox.Domain.Events:OrderCreated, type: fanout, durable
[13:33:48 DBG] Bind queue: source: OrderCreated, destination: OrderCreated
[13:33:48 DBG] Bind exchange: source: Demo.TransactionalOutbox.Domain.Events:OrderCreated, destination: OrderCreated
[13:33:48 DBG] Consumer Ok: rabbitmq://localhost:6666/OrderCreated - amq.ctag-53c0dDTumv3l33VqwMiSpA
[13:33:48 DBG] Endpoint Ready: rabbitmq://localhost:6666/OrderCreated
[13:33:48 INF] Bus started: rabbitmq://localhost:6666/

与发件箱模式的示例应用程序相比,我没有看到发件箱的任何日志,无论是在rabbitmq启动和运行时,还是在它关闭时。

mzillmmw

mzillmmw1#

简短回答,事务发件箱仅适用于主(IBus)总线示例。使用MultiBus时的任何其他总线示例此时都无法使用事务发件箱。

已更新

在事件发射器中,不能使用IBus作为发布端点,因为它没有作用域。但也不能只使用IPublishEndpoint,因为它可能是来自命令总线上的消费者的 ConsumeContext。获取事务发件箱的底层连接并没有真正设置为从一个总线上的消费者到另一个总线上的生成事件的工作方式。

相关问题