我需要一个异步的消费者方法来消费来自RabbitMq的消息。我的问题是.net的rabbitmq客户端依赖于一个事件处理程序。我试图用一个信号量实现一个阻塞系统,它在低容量下工作。当我获得更多容量时,一些消息丢失了。
下面是我的实现:
private long _lock;
private string _message;
private object _tag;
private readonly SemaphoreSlim _signal;
public void Configure()
{
Interlocked.Exchange(ref _lock, 0);
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (sender, ea) =>
{
_message = Encoding.UTF8.GetString(ea.Body.ToArray());
_tag = ea.DeliveryTag;
Interlocked.Exchange(ref _lock, 1);
_signal.Release();
};
_channel.BasicConsume(queue: _rabbitConfig.Queue, autoAck: true, consumer: consumer);
}
public async Task<string> Consume(CancellationToken cancellationToken)
{
while (0 == Interlocked.Read(ref _lock))
{
await _signal.WaitAsync();
}
Interlocked.Exchange(ref _lock, 0);
return _message;
}
我也尝试过使用BufferBlock,但是仍然丢失了一些消息。有没有其他的方法来实现一个保留Consume()方法的系统?
1条答案
按热度按时间nfs0ujit1#
代码的问题在于,您使用共享变量(_message)来存储接收到的消息,并且没有使用锁或其他同步机制保护此变量。这意味着,如果多个线程同时调用Consume()方法,它们可能会并发访问和修改_message变量,从而导致数据争用和可能的数据丢失。
若要修正此问题,您可以使用lock陈述式来保护共用变数,并确保一次只有一个执行绪可以存取该变数。这样可以防止多个执行绪同时存取和修改该变数,并确保正确使用消息。
以下示例说明如何修改代码以使用lock语句保护共享变量: