我试图获得一个RabbitMQ设置,在那里我可以选择将消息发布到服务,或者作为扇出,或者直接。但是,当我发布到扇出交换时,我看到消息被传递到所有服务,而且是以循环方式传递的。因此,其中一个服务总是看到相同的消息两次。
下面是一个完整的repro:
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace rabbitmq_exchanges_repro
{
class Program
{
static void Main(string[] args)
{
var hostName = "localhost";
var factory = new ConnectionFactory
{
AutomaticRecoveryEnabled = true,
HostName = hostName,
};
var connection = factory.CreateConnection();
var model = connection.CreateModel();
var serviceName = "service1";
// This queue is for round-robin messages distributed to instances of the service with the specified service name.
var directExchangeName = $"{serviceName}-direct";
model.QueueDeclare(
serviceName,
durable: true,
exclusive: false,
autoDelete: false);
model.ExchangeDeclare(
exchange: directExchangeName,
type: "direct",
durable: true,
autoDelete: false);
model.QueueBind(
queue: serviceName,
exchange: directExchangeName,
routingKey: string.Empty);
// This is for fanout messages distributed to all services with the specified service name.
var fanoutExchangeName = $"{serviceName}-fanout";
model.ExchangeDeclare(
exchange: fanoutExchangeName,
type: "fanout",
durable: true,
autoDelete: false);
var fanoutQueueName = model
.QueueDeclare()
.QueueName;
model.QueueBind(
queue: fanoutQueueName,
exchange: fanoutExchangeName,
routingKey: string.Empty);
var directConsumer = new EventingBasicConsumer(model);
var fanoutConsumer = new EventingBasicConsumer(model);
var workItemConsumerTag = model.BasicConsume(
queue: serviceName,
autoAck: true,
consumer: directConsumer);
var fanoutConsumerTag = model.BasicConsume(
queue: fanoutQueueName,
autoAck: true,
consumer: fanoutConsumer);
directConsumer.Received += (o, e) =>
{
Console.WriteLine("Received message (direct)");
};
fanoutConsumer.Received += (o, e) =>
{
Console.WriteLine("Received message (fanout)");
};
Console.WriteLine("[P]ublish");
Console.WriteLine("E[x]it");
var exit = false;
while (!exit)
{
var key = Console.ReadKey();
switch (key.Key)
{
case ConsoleKey.P:
model
.BasicPublish(
exchange: fanoutExchangeName,
routingKey: string.Empty,
body: new byte[] { 1, 2, 3 });
break;
case ConsoleKey.X:
exit = true;
break;
}
}
model.BasicCancel(workItemConsumerTag);
model.BasicCancel(fanoutConsumerTag);
model.Close();
model.Dispose();
connection.Close();
connection.Dispose();
}
}
}
在两个单独的控制台窗口中运行上述代码。如果在一个窗口中按P
,您将看到一个示例输出了我所期望的内容:
Received message (fanout)
但另一个窗口输出如下:
Received message (fanout)
Received message (direct)
尽管PublishBasic
调用指定了fanout交换名称。这是怎么回事我如何确保直接交易不涉及此案?
1条答案
按热度按时间bxjv4tth1#
我无法使用RabbitMQ 3.7.14和您的代码进行复制。我只在每个终端窗口中收到“已接收消息(扇出)”消息。也许RabbitMQ中有旧的绑定?您应该重置示例并重试。