我尝试在应用程序运行时创建新队列后使用新消息。
1.在我的应用中创建新项目时,服务发布新ItemCreatedMessage
ItemCreatedMessage
由CreateItemQueuesConsumer
消费者消费
2.1.在CreateItemQueuesConsumer
中,我将ConnectReceiveEndpoint
配置为设置队列名称,并使用第二个参数IReceiveEndpointConfigurator
将消费者与消费者工厂方法连接起来
1.我的队列成功创建,绑定到我的交易所,我可以在RabbitMQ控制台中看到它们
1.但是,当我将新消息发布到我的队列时,该消息将由我在工厂方法中配置的消费者使用,消费者抛出System.ObjectDisposedException
这是我的以下代码:
// CreateMachineQueuesConsumer.cs
public async Task Consume(ConsumeContext<MachineCreatedMessage> context)
{
var message = context.Message;
var machineId = message.MachineId;
var cancellationToken = context.CancellationToken;
using var provider = this.serviceProvider.CreateScope();
var createMachineStatusQueue = this.bus.ConnectReceiveEndpoint(
string.Format("update_machine_status__{0}", machineId),
x =>
{
if (x is IRabbitMqReceiveEndpointConfigurator c)
{
c.ConfigureConsumeTopology = false;
c.ConcurrentMessageLimit = 1;
c.PrefetchCount = 1;
c.Consumer(() =>
{
return provider.ServiceProvider
.GetRequiredService<UpdateMachineStatusConsumer>();
});
c.Bind("machine_status_cycle", s =>
{
s.RoutingKey = machineId.ToString();
s.ExchangeType = ExchangeType.Direct;
});
}
});
var anotherQueues = ...
字符串
当我重新加载我的应用程序时,消息会被使用,因为我有一个后台服务,它会在我的应用程序构建时创建新队列。
2条答案
按热度按时间jdzmm42g1#
要连接接收端点,您应该在消费者中使用
IReceiveEndpointConnector
。字符串
此外,在
AddMassTransit
中,请确保同时调用:x.AddConsumer<ThisConsumer>();
和x.AddConsumer<UpdateMachineStatusConsumer>();
这在文档中有所涉及。
jslywgbw2#
如果我不得不从你提供的代码中猜测,嫌疑人应该是这样的代码:
字符串
在lambda
x =>
的作用域中,这反过来又引用了一个在两者作用域之外的一次性变量。尝试将“provider”服务定位器移动到访问它的作用域中:型