rabbitmq 为什么发布到扇出交换也发布到直接交换?

mzmfm0qo  于 2023-10-20  发布在  RabbitMQ
关注(0)|答案(1)|浏览(138)

我试图获得一个RabbitMQ设置,在那里我可以选择将消息发布到服务,或者作为扇出,或者直接。但是,当我发布到扇出交换时,我看到消息被传递到所有服务,而且是以循环方式传递的。因此,其中一个服务总是看到相同的消息两次。
下面是一个完整的repro:

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace rabbitmq_exchanges_repro
{
    class Program
    {
        static void Main(string[] args)
        {
            var hostName = "localhost";
            var factory = new ConnectionFactory
            {
                AutomaticRecoveryEnabled = true,
                HostName = hostName,
            };

            var connection = factory.CreateConnection();
            var model = connection.CreateModel();

            var serviceName = "service1";

            // This queue is for round-robin messages distributed to instances of the service with the specified service name.
            var directExchangeName = $"{serviceName}-direct";
            model.QueueDeclare(
                serviceName,
                durable: true,
                exclusive: false,
                autoDelete: false);
            model.ExchangeDeclare(
                exchange: directExchangeName,
                type: "direct",
                durable: true,
                autoDelete: false);
            model.QueueBind(
                queue: serviceName,
                exchange: directExchangeName,
                routingKey: string.Empty);

            // This is for fanout messages distributed to all services with the specified service name.
            var fanoutExchangeName = $"{serviceName}-fanout";
            model.ExchangeDeclare(
                exchange: fanoutExchangeName,
                type: "fanout",
                durable: true,
                autoDelete: false);
            var fanoutQueueName = model
                .QueueDeclare()
                .QueueName;
            model.QueueBind(
                queue: fanoutQueueName,
                exchange: fanoutExchangeName,
                routingKey: string.Empty);

            var directConsumer = new EventingBasicConsumer(model);
            var fanoutConsumer = new EventingBasicConsumer(model);
            var workItemConsumerTag = model.BasicConsume(
                queue: serviceName,
                autoAck: true,
                consumer: directConsumer);
            var fanoutConsumerTag = model.BasicConsume(
                queue: fanoutQueueName,
                autoAck: true,
                consumer: fanoutConsumer);

            directConsumer.Received += (o, e) =>
            {
                Console.WriteLine("Received message (direct)");
            };
            fanoutConsumer.Received += (o, e) =>
            {
                Console.WriteLine("Received message (fanout)");
            };

            Console.WriteLine("[P]ublish");
            Console.WriteLine("E[x]it");
            var exit = false;

            while (!exit)
            {
                var key = Console.ReadKey();

                switch (key.Key)
                {
                    case ConsoleKey.P:
                        model
                            .BasicPublish(
                                exchange: fanoutExchangeName,
                                routingKey: string.Empty,
                                body: new byte[] { 1, 2, 3 });
                        break;
                    case ConsoleKey.X:
                        exit = true;
                        break;
                }
            }

            model.BasicCancel(workItemConsumerTag);
            model.BasicCancel(fanoutConsumerTag);

            model.Close();
            model.Dispose();

            connection.Close();
            connection.Dispose();
        }
    }
}

在两个单独的控制台窗口中运行上述代码。如果在一个窗口中按P,您将看到一个示例输出了我所期望的内容:

Received message (fanout)

但另一个窗口输出如下:

Received message (fanout)
Received message (direct)

尽管PublishBasic调用指定了fanout交换名称。这是怎么回事我如何确保直接交易不涉及此案?

bxjv4tth

bxjv4tth1#

我无法使用RabbitMQ 3.7.14和您的代码进行复制。我只在每个终端窗口中收到“已接收消息(扇出)”消息。也许RabbitMQ中有旧的绑定?您应该重置示例并重试。

相关问题