这个问题在here上讨论过,并由@chris-patterson修复,但我真的不知道如何应用到我的代码中。
总结:
我有一个场景,添加令牌每个事件的头部之前发送。我可以在Masstransit中使用Send Scoped Filter
来实现这一点。因此,在SendFilter中,我从HttpContext
获取Token并添加标头,然后我可以在通过ConsumeContext
或Consume Scoped Filter
消费时从标头获取此令牌。到目前为止,一切都很好,但当我需要在消费期间发送另一个事件时,这次在SendFilter中我无法到达ConsumeContext,我得到了MissingConsumeContext Exception
。我已经看到为SendFilter
和ConsumeFilter
创建了一个作用域对象。但我就是不喜欢。这是合并消费和发送过滤器的链接。这个repo实际上与我的用例相同,唯一的区别是我没有使用请求/响应客户端发送消息。
我的代码看起来像:
public class TokenSendFilter<T> : IFilter<SendContext<T>> where T : class
{
private MyDependency myDependency;
private readonly IWorkContext _workcontext;
public TokenSendFilter(MyDependency dependency, IWorkContext workcontext)
{
myDependency = dependency;
_workcontext = workcontext;
}
public Task Send(SendContext<T> context, IPipe<SendContext<T>> next)
{
if (!string.IsNullOrWhiteSpace(_workcontext.Token))
{
myDependency.Token = _workcontext.Token;
context.Headers.Set("Token", _workcontext.Token);
}
else if((!string.IsNullOrWhiteSpace(myDependency.Token)))
{
context.Headers.Set(key: "Token", myDependency.Token);
}
return next.Send(context);
}
public void Probe(ProbeContext context)
{
}
}
public class TokenConsumeFilter<T> : IFilter<ConsumeContext<T>> where T : class
{
private MyDependency myDependency;
public TokenConsumeFilter(MyDependency dependency)
{
myDependency = dependency;
}
public Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next)
{
if (context.Headers.TryGetHeader("Token", out object token))
{
myDependency.Token = (string)token;
}
return next.Send(context);
}
public void Probe(ProbeContext context)
{
}
}
字符串
**IWorkContext只是获取HttpContext头值的抽象。
注册作用域筛选器:
services.AddScoped<MyDependency>();
services.AddMassTransit(x =>
{
x.AddConsumer<MyConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.UseSendFilter(typeof(TokenSendFilter<>), context);
cfg.UseConsumeFilter(typeof(TokenConsumeFilter<>), context);
cfg.ConfigureEndpoints(context);
});
});
的
发送事件:
var address = new Uri($"exchange:{exchangeName}");
var endpoint = await _bus.GetSendEndpoint(address);
await endpoint.Send(The Message);
型
我也尝试了this,但它没有在SendContext Filter中找到ConsumeContext负载。
我相信SendFiler超出了ConsumeContext的范围,所以我需要在发送另一个事件时获取ConsumeContext对象。
Masstransit版本(8.05)
任何建议将是伟大的。先谢了。
1条答案
按热度按时间hsvhsicv1#
IBus
没有作用域,所以使用_bus.GetSendEndpoint()
是不起作用的。必须使用ISendEndpointProvider
,它是有作用域的,将在当前作用域中解析。此外,v8.0.5是旧的,我建议升级到最新版本。