rabbitmq 如何从Masstransit中的SendFilter访问ConsumeContext?

ldioqlga  于 2023-08-05  发布在  RabbitMQ
关注(0)|答案(1)|浏览(133)

这个问题在here上讨论过,并由@chris-patterson修复,但我真的不知道如何应用到我的代码中。
总结:
我有一个场景,添加令牌每个事件的头部之前发送。我可以在Masstransit中使用Send Scoped Filter来实现这一点。因此,在SendFilter中,我从HttpContext获取Token并添加标头,然后我可以在通过ConsumeContextConsume Scoped Filter消费时从标头获取此令牌。到目前为止,一切都很好,但当我需要在消费期间发送另一个事件时,这次在SendFilter中我无法到达ConsumeContext,我得到了MissingConsumeContext Exception。我已经看到为SendFilterConsumeFilter创建了一个作用域对象。但我就是不喜欢。这是合并消费和发送过滤器的链接。这个repo实际上与我的用例相同,唯一的区别是我没有使用请求/响应客户端发送消息。
我的代码看起来像:

public class TokenSendFilter<T> : IFilter<SendContext<T>> where T : class
    {

        private MyDependency myDependency;
        private readonly IWorkContext _workcontext;

        public TokenSendFilter(MyDependency dependency, IWorkContext workcontext)
        {
            myDependency = dependency;
            _workcontext = workcontext;
        }

        public Task Send(SendContext<T> context, IPipe<SendContext<T>> next)
        {
            if (!string.IsNullOrWhiteSpace(_workcontext.Token))
            {
                myDependency.Token = _workcontext.Token;
                context.Headers.Set("Token", _workcontext.Token);
            }
            else if((!string.IsNullOrWhiteSpace(myDependency.Token)))
            {
                context.Headers.Set(key: "Token", myDependency.Token);
            }
            return next.Send(context);
        }
        public void Probe(ProbeContext context)
        {
        }
    }

public class TokenConsumeFilter<T> : IFilter<ConsumeContext<T>> where T : class
    {
        private MyDependency myDependency;

        public TokenConsumeFilter(MyDependency dependency)
        {
            myDependency = dependency;
        }

        public Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next)
        {

            if (context.Headers.TryGetHeader("Token", out object token))
            {
                myDependency.Token = (string)token;
            }

            return next.Send(context);
        }

        public void Probe(ProbeContext context)
        {
        }
    }

字符串

**IWorkContext只是获取HttpContext头值的抽象。

注册作用域筛选器:

services.AddScoped<MyDependency>();

services.AddMassTransit(x =>
            {
                x.AddConsumer<MyConsumer>();

                x.UsingRabbitMq((context, cfg) =>
                {
                    cfg.UseSendFilter(typeof(TokenSendFilter<>), context);
                    cfg.UseConsumeFilter(typeof(TokenConsumeFilter<>), context);

                    cfg.ConfigureEndpoints(context);
                });
            });


发送事件:

var address = new Uri($"exchange:{exchangeName}");

var endpoint = await _bus.GetSendEndpoint(address);

await endpoint.Send(The Message);


我也尝试了this,但它没有在SendContext Filter中找到ConsumeContext负载。
我相信SendFiler超出了ConsumeContext的范围,所以我需要在发送另一个事件时获取ConsumeContext对象。
Masstransit版本(8.05)
任何建议将是伟大的。先谢了。

hsvhsicv

hsvhsicv1#

IBus没有作用域,所以使用_bus.GetSendEndpoint()是不起作用的。必须使用ISendEndpointProvider,它是有作用域的,将在当前作用域中解析。
此外,v8.0.5是旧的,我建议升级到最新版本。

相关问题