我正在一个.net core 6 web应用程序上使用带有RabbitMQ的MassTransit。我的目标是使一个应用程序的多个示例在不同的工厂上运行时保持同步。该应用程序需要能够发布/使用消息。
当一个站点发布了一些东西,这是广播到所有的站点队列(也是自己,它会简单地放弃消息)。
为此,我配置了带有工厂后缀的MassTransit队列名称:例如norm-queue-CV、norm-queue-MB。我还配置了Consumer以绑定到通用扇出交换名称(norm-exchange)。
以下是我的配置摘录:
public void ConfigureServices(IServiceCollection services)
{
services.AddMassTransit(x =>
{
x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host(new Uri(_configuration["RabbitMQ:URI"] + _configuration["RabbitMQ:VirtualHost"]), $"ENG {_configuration["Application:PlantID"]} Producer", h =>
{
h.Username(_configuration["RabbitMQ:UserName"]);
h.Password(_configuration["RabbitMQ:Password"]);
});
cfg.Publish<NormCreated>(x =>
{
x.Durable = true;
x.AutoDelete = false;
x.ExchangeType = "fanout"; // default, allows any valid exchange type
});
}));
});
// consumer
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host(new Uri(_configuration["RabbitMQ:URI"] + _configuration["RabbitMQ:VirtualHost"]), $"ENG {_configuration["Application:PlantID"]} Consumer", h =>
{
h.Username(_configuration["RabbitMQ:UserName"]);
h.Password(_configuration["RabbitMQ:Password"]);
});
cfg.ReceiveEndpoint($"norm-queue-{_configuration["Application:PlantID"]}", e =>
{
e.Consumer<NormConsumer>();
e.UseConcurrencyLimit(1);
e.UseMessageRetry(r => r.Intervals(100, 200, 500, 800, 1000));
e.Bind<NormCreated>();
e.Bind("norm-exchange");
});
});
busControl.Start();
NormConsumer是如何定义的
public class NormConsumer : IConsumer<NormCreated>
{
private readonly ILogger<NormConsumer>? logger;
public NormConsumer()
{
}
public NormConsumer(ILogger<NormConsumer> logger)
{
this.logger = logger;
}
public async Task Consume(ConsumeContext<NormCreated> context)
{
logger.LogInformation("Norm Submitted: {NormID}", context.Message.NormID);
//await context.Publish<NormCreated>(new
//{
// context.Message.OrderId
//});
}
}
这里的队列是自动创建的。对我来说,它们看起来很好x1c 0d1x
这里创建了交换。我试图只得到一个交换(norm-exchange),但也创建了其他2个。
我的问题是首先要了解我的布局是否有意义(我对Rabbit/Masstransit还很陌生)。
此外,我想覆盖交换的命名方式,强制该队列只有一个交换:“norm-exchange”。我试图在“producer”部分覆盖它,但无法做到这一点
1条答案
按热度按时间jslywgbw1#
RabbitMQ代理拓扑在RabbitMQ - The Details和文档中有详细介绍。
您不需要在接收端点中呼叫
Bind
,因为已经为您系结消费者消息类型。请移除这两个Bind
陈述式,所有已出版的消息都会依类型路由至接收端点。