我有一个大型项目遵循干净的架构解决方案布局和一些领域驱动设计/ CQRS方法。我已经遵循了米兰约万诺维奇的早期清洁建筑YouTube视频相当虔诚地到达这里,如果这有帮助。在集成Mass Transit之前,我使用了他的发件箱模式方法,该方法使用EF核心拦截器来拦截SavingChangesAsync方法,提取域在该事务期间引发的事件并将其保存到发件箱表中,如下所示:
public override ValueTask<InterceptionResult<int>> SavingChangesAsync(DbContextEventData eventData,
InterceptionResult<int> result, CancellationToken cancellationToken = default)
{
var context = eventData.Context;
if (context is null)
return base.SavingChangesAsync(eventData, result, cancellationToken);
var events = context.ChangeTracker
.Entries<AggregateRoot>()
.Select(a => a.Entity)
.Where(e => e.GetDomainEvents() is not null)
.SelectMany(e =>
{
var domainEvents = e.GetDomainEvents();
e.ClearDomainEvents();
return domainEvents;
});
var messages = events
.Where(e => e is not null)
.Select(e => new OutboxMessage()
{
Id = Guid.NewGuid(),
OccurredOnUtc = DateTime.UtcNow,
Type = e.GetType().Name,
Content = JsonConvert.SerializeObject(e, new JsonSerializerSettings
{
TypeNameHandling = TypeNameHandling.All
})
})
.ToList();
context.Set<OutboxMessage>().AddRange(messages);
return base.SavingChangesAsync(eventData, result, cancellationToken);
}
这些条目随后会被Quartz作业拾取,该作业每隔X秒运行一次,并使用MediatR的通知发布事件。我将不会详细介绍目前的这种方法,因为它绝对是无风险的。
我决定用Mass Transit / RabbitMQ来代替这个伟大的方法,使它成为一个真正的pub/sub,并摆脱10秒的延迟。以下是我如何注册公共交通:
private static void ConfigureMessageQueue(IServiceCollection services, IConfiguration configuration)
{
services.Configure<MessageBrokerSettings>(configuration.GetSection("MessageBroker"));
services.AddSingleton(sp =>
sp.GetRequiredService<IOptions<MessageBrokerSettings>>().Value);
services.AddMassTransit(busConfigurator =>
{
busConfigurator.SetKebabCaseEndpointNameFormatter();
busConfigurator.AddConsumersFromNamespaceContaining<ISomeApplicationMarker>();
busConfigurator.UsingRabbitMq((context, configurator) =>
{
var settings = context.GetRequiredService<MessageBrokerSettings>();
configurator.Host(new Uri(settings.Host), h =>
{
h.Username(settings.Username);
h.Password(settings.Password);
});
configurator.ConfigureEndpoints(context);
});
});
services.AddTransient<IEventBus, EventBus>();
}
事件总线只是IBus接口的一个抽象,以尽可能地将我的应用程序代码与Mass Transit / RabbitMQ解耦:
internal sealed class EventBus : IEventBus
{
private readonly IBus _bus;
private readonly ILogger<EventBus> _logger;
public EventBus(IBus bus, ILogger<EventBus> logger)
{
_bus = bus;
_logger = logger;
}
public async Task PublishAsync<TMessage>(TMessage message, CancellationToken cancellationToken = default)
where TMessage : class, IDomainEvent
{
_logger.LogDebug("Publishing event {@Event}", message);
await _bus.Publish(message, cancellationToken);
}
}
为了简洁起见,我不会展示我的消费者,但它们只是正确事件类型的IConsumer实现。
我已经做了一个新的Ef Core拦截器,它做了几乎相同的事情,除了它使用事件总线将事件发布到RabbitMQ,而不是将其保存在发件箱表中。
奇怪的是。
如果我直接从MediatR处理程序发布事件(因此完全跳过新的拦截器),一切都正常。消息会传到RabbitMQ,并返回给我的消费者,做我想做的事情。
如果我尝试使用我的拦截器,执行确实在正确的时间进入它,我看到事件总线的日志声明,事件已经发布,我看到消息击中RabbitMQ,但它从来没有触发消费者!据我所知,关于代理的一切都配置正确,因为它在从MediatR处理程序直接调用Publish时工作。我看到Mass Transit的日志声明,当应用程序启动时,端点已经配置并连接到正确的消费者,RabbitMQ正确地显示了交换和队列,一切都是一样的(显然,因为与从MediatR处理程序调用时相比,没有任何变化)。
对不起,这是一个很大的过程,但这一个真的没有意义的我。我很乐意提供更多的代码,如果它可以帮助。谢谢
编辑
这是我要求的拦截器。我尝试覆盖SavingChangesAsync和SavedChangesAsync,但没有成功:
public sealed class ConvertDomainEventsToMessageBrokerMessagesInterceptor : SaveChangesInterceptor
{
private readonly IEventBus _eventBus;
public ConvertDomainEventsToMessageBrokerMessagesInterceptor(IEventBus eventBus) => _eventBus = eventBus;
/*public async override ValueTask<InterceptionResult<int>> SavingChangesAsync(DbContextEventData eventData, InterceptionResult<int> result, CancellationToken cancellationToken = default)
{
var context = eventData.Context;
if (context is null)
return await base.SavingChangesAsync(eventData, result, cancellationToken);
var events = context.ChangeTracker
.Entries<AggregateRoot>()
.Select(a => a.Entity)
.Where(e => e.GetDomainEvents() is not null)
.SelectMany(e =>
{
var domainEvents = e.GetDomainEvents();
e.ClearDomainEvents();
return domainEvents;
});
var res = await base.SavingChangesAsync(eventData, result, cancellationToken);
foreach (var domainEvent in events)
await _eventBus.PublishAsync(domainEvent, cancellationToken);
return res;
}*/
public override async ValueTask<int> SavedChangesAsync(SaveChangesCompletedEventData eventData, int result, CancellationToken cancellationToken = default)
{
var context = eventData.Context;
if (context is null)
return await base.SavedChangesAsync(eventData, result, cancellationToken);
var events = context.ChangeTracker
.Entries<AggregateRoot>()
.Select(a => a.Entity)
.Where(e => e.GetDomainEvents() is not null)
.SelectMany(e =>
{
var domainEvents = e.GetDomainEvents();
e.ClearDomainEvents();
return domainEvents;
});
foreach(var domainEvent in events)
await _eventBus.PublishAsync(domainEvent, cancellationToken);
return await base.SavedChangesAsync(eventData, result, cancellationToken);
}
}
2条答案
按热度按时间hk8txs481#
MassTransit有自己的事务发件箱,应该使用它来代替MediatR的自定义实现。有一个sample available和附带的视频。
oxosxuxt2#
我发现了我的问题,从MediatR处理程序直接发布和从EF Core拦截器发布之间还有另一个区别:一个发布具体类型,另一个发布抽象类型!
我的所有事件都继承自IDomainEvent,我的AggregateRoot类中的列表是IDomainEvents的列表。当直接从处理程序发布时,我可以显式发布一个具体的Somewhere HappenedEvent类型,Mass Transit/RabbitMQ可以识别该类型。
我的拦截器访问IDomainEvent类型列表,最终发布为IDomainEvent,而不是具体类型。我尝试从拦截器发布一个显式的Somewhere HappenedEvent,它工作得很好。
我的问题最终是我使用了
Publish<TMessage>(TMessage message)
而不是Publish(object message)
,这是作为IDomainEvent类型发布的东西,而Mass Transit/RabbitMQ对此一无所知。一旦我切换了我的方法签名,一切就开始工作了。