RabbitMQ:多个消费者如何从同一个队列接收消息?

6qqygrtg  于 2022-11-23  发布在  RabbitMQ
关注(0)|答案(2)|浏览(293)

我运行生产者,它生成N条消息,我在 Jmeter 板上看到它们。当我运行接收者时,它从队列中接收所有消息,队列是空的。

static void Receive(string QueName)
    {
        ConnectionFactory connectionFactory = new ConnectionFactory
        {
            HostName = HostName,
            UserName = UserName,
            Password = Password,
        };
        var connection = connectionFactory.CreateConnection();
        var channel = connection.CreateModel();
        channel.BasicQos(0, 1, false);
        MessageReceiver messageReceiver = new MessageReceiver(channel);
        channel.BasicConsume(QueName, false, messageReceiver);
        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }

// Receiver
 public class MessageReceiver : DefaultBasicConsumer
    {
        private readonly IModel _channel;
        public MessageReceiver(IModel channel)
        {
            _channel = channel;
        }
        public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory<byte> body)
        {
            Console.WriteLine($"------------------------------");
            Console.WriteLine($"Consuming Message");
            Console.WriteLine(string.Concat("Message received from the exchange ", exchange));
            Console.WriteLine(string.Concat("Consumer tag: ", consumerTag));
            Console.WriteLine(string.Concat("Delivery tag: ", deliveryTag));
            Console.WriteLine(string.Concat("Routing tag: ", routingKey));
            //Console.WriteLine(string.Concat("Message: ", Encoding.UTF8.GetString(body)));

            var message = Encoding.UTF8.GetString(body.ToArray());
            Console.WriteLine(string.Concat("Message: ", message));
            Console.WriteLine($"------------------------------");
            _channel.BasicAck(deliveryTag, false);
        }
    }

我需要有多个生成器,它们向同一队列生成消息。多个客户从队列接收消息。消息将按队列TTL删除。但现在第一个接收器从队列中获取所有消息。我如何才能做到这一点?

daupos2t

daupos2t1#

最好的解决办法是:每个客户端都应该有自己队列,可以是TTL,也可以是过期参数。

qlckcl4x

qlckcl4x2#

我们在这里使用"exchange",只是为了在同一个示例中显示交换机制,它并不是任务真正需要的(检查Worker2项目,它与另一个绑定到同一个交换的队列一起工作):

channel.ExchangeDeclare(exchange: “logs”, type: ExchangeType.Fanout);

消费全样本

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Timers;

class Worker
{
    public static void Main()    
    {
        // Test of timer handler
        System.Timers.TimeraTimer = new System.Timers.Timer();
        aTimer.Elapsed += new ElapsedEventHandler((source, e) 
                                => Console.Write("Timer Test"));
        aTimer.Interval=3000;
        // Test timer
        // aTimer.Enabled = true;
        
        var factory = new ConnectionFactory()        
        {
            HostName = "localhost", UserName="user", Password="password",
            // DispatchConsumersAsync = true        
        };
        var connection = factory.CreateConnection();
        
        // Add multiple consumers, so that queue can be processed "in
        // parallel"
        for (int i=1; i<10; i++)        
        {
            var j=i;
            var channel = connection.CreateModel();
            
            channel.ExchangeDeclare(exchange: "logs", type: 
                                    ExchangeType.Fanout);
            var queueName=channel.QueueDeclare("test1", durable: true, 
                            autoDelete: false, exclusive: false); 
            
            // take 1 message per consumer
            channel.BasicQos(0, 1, false);
            
            channel.QueueBind(queue: queueName,
                    exchange: "logs",
                    routingKey: "");
            Console.WriteLine($" [*] Waiting for messages in {j}");
            
            var consumer = new EventingBasicConsumer(channel);
            
            consumer. Received+= (model, ea) =>            
            {
                byte[] body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine($" [x] Received in {j} -> {message} at 
                                                    {DateTime.Now}");
                
                // Thread.Sleep(dots * 1000);
                
                // await Task.Delay(3000);
                Thread.Sleep(10000); 
                // async works too
                
                if (j==5)                
                {
                    // Test special case of returning item to queue: in 
                    // this case we received the message, but did not process 
                    // it because of some reason.
                    // QOS is 1, so our consumer is already full. We need 
                    // to return the message to the queue, so that another 
                    // consumer can work with it
                    Console.WriteLine($"[-] CANT PROCESS {j} consumer! 
                                        Error with -> {message}"); 
                    channel.BasicNack(deliveryTag: ea.DeliveryTag, 
                                            multiple: false, true);                
                }
                else                
                {
                    Console.WriteLine($" [x] Done {j} -> {message} at 
                                        {DateTime.Now}");
                    
                    // here channel could also be accessed as 
                    ((EventingBasicConsumer)sender).Model
                    channel.BasicAck(deliveryTag: ea.DeliveryTag, 
                                                multiple: false);                
                }            
            };
            channel.BasicConsume(queue: queueName, autoAck: false, 
                                consumer: consumer);        
        }
        
        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();    
    }
}

link中完整示例

相关问题