rabbitmq 家兔Mq消耗量

pgx2nnw8  于 2022-12-04  发布在  RabbitMQ
关注(0)|答案(1)|浏览(160)

我需要一个异步的消费者方法来消费来自RabbitMq的消息。我的问题是.net的rabbitmq客户端依赖于一个事件处理程序。我试图用一个信号量实现一个阻塞系统,它在低容量下工作。当我获得更多容量时,一些消息丢失了。
下面是我的实现:

private long _lock;
    private string _message;
    private object _tag;
    private readonly SemaphoreSlim _signal;

    public void Configure()
    {
        Interlocked.Exchange(ref _lock, 0);

        var consumer = new EventingBasicConsumer(_channel);
        consumer.Received += (sender, ea) =>
        {
            _message = Encoding.UTF8.GetString(ea.Body.ToArray());
            _tag = ea.DeliveryTag;
            Interlocked.Exchange(ref _lock, 1);
            _signal.Release();
        };

        _channel.BasicConsume(queue: _rabbitConfig.Queue, autoAck: true, consumer: consumer);
    }
    
    public async Task<string> Consume(CancellationToken cancellationToken)
    {
        while (0 == Interlocked.Read(ref _lock))
        {
            await _signal.WaitAsync();
        }
        Interlocked.Exchange(ref _lock, 0);
        return _message;
    }

我也尝试过使用BufferBlock,但是仍然丢失了一些消息。有没有其他的方法来实现一个保留Consume()方法的系统?

nfs0ujit

nfs0ujit1#

代码的问题在于,您使用共享变量(_message)来存储接收到的消息,并且没有使用锁或其他同步机制保护此变量。这意味着,如果多个线程同时调用Consume()方法,它们可能会并发访问和修改_message变量,从而导致数据争用和可能的数据丢失。
若要修正此问题,您可以使用lock陈述式来保护共用变数,并确保一次只有一个执行绪可以存取该变数。这样可以防止多个执行绪同时存取和修改该变数,并确保正确使用消息。
以下示例说明如何修改代码以使用lock语句保护共享变量:

private long _lock;
private string _message;
private object _tag;
private readonly SemaphoreSlim _signal;
private readonly object _syncRoot = new object();

public void Configure()
{
    Interlocked.Exchange(ref _lock, 0);

    var consumer = new EventingBasicConsumer(_channel);
    consumer.Received += (sender, ea) =>
    {
        lock (_syncRoot)
        {
            _message = Encoding.UTF8.GetString(ea.Body.ToArray());
            _tag = ea.DeliveryTag;
        }
        Interlocked.Exchange(ref _lock, 1);
        _signal.Release();
    };

    _channel.BasicConsume(

相关问题