限制从服务器获取的RabbitMQ消息的数量

a5g8bdjr  于 2022-12-23  发布在  RabbitMQ
关注(0)|答案(2)|浏览(282)

我写了一个RabbitMQ的 Package 器,一切都很好,实际上太好了,我接收消息的速度比我处理它们的速度快。我如何限制从队列中获得的消息数量,或者更好,一次只使用和处理一条消息?

public void Consume()
{
  if (_channel != null)
   {
     // setup a listener for new messages
     var consumer = new EventingBasicConsumer(_channel);

     consumer.Received += (model, ea) =>
     {
       var body = ea.Body.ToArray();
       var message = Encoding.UTF8.GetString(body);
       var evt = new MessageEventArgs(body, message);

       OnMessageReceived(evt);
      };
     _channel.BasicConsume(queue: _queue, autoAck: true, consumer: consumer);
    }
}
vwhgwdsa

vwhgwdsa1#

要限制从队列中使用的消息数量,或者一次只使用和处理一条消息,您可以修改RabbitMQ Package 器中的Consume方法,如下所示:
1.将BasicConsume方法的autoAck参数设置为false。这将禁用消息的自动确认,这意味着消息将保留在队列中,直到手动确认。
1.在处理完消息后,在Received事件处理程序中添加对BasicAck方法的调用。这将向RabbitMQ发送确认消息已成功处理并可以从队列中删除。

public void Consume()
{
    if (_channel != null)
    {
        // setup a listener for new messages
        var consumer = new EventingBasicConsumer(_channel);

        consumer.Received += (model, ea) =>
        {
            var body = ea.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);
            var evt = new MessageEventArgs(body, message);

            OnMessageReceived(evt);

            // send acknowledgement to RabbitMQ that the message has been processed
            _channel.BasicAck(ea.DeliveryTag, false);
        };

        _channel.BasicConsume(queue: _queue, autoAck: false, consumer: consumer);
    }

}
通过这些更改,Consume方法一次将仅使用和处理一条消息,并且消息将保留在队列中,直到它们被手动确认。

yhuiod9q

yhuiod9q2#

限制使用的消息

//limit to 5 messages
channel.BasicQos(0, 5, false);

在此之后,您可以调用noAck参数为false的BasicConsume方法。

channel.BasicConsume(queue: _queue, autoAck: false, consumer: consumer);

相关问题