bounty 18小时后到期。回答此问题可获得+50声望奖励。PKCS12希望引起更多关注这个问题。
我正在尝试使用RabbitMQ在C#中使用RabbitMQ消息。客户端库并获取消息列表,并在返回数组之前将其添加到数组中。在此之后,我希望处理这个数组并将其发送到一个新队列。所以我的第一个方法如下:
public async Task<List<string>> ProcessMessages()
{
var factory = new ConnectionFactory { HostName = "local"};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "myqueue");
var consumer = new EventingBasicConsumer(channel);
var messages= new List<string>();
consumer.Received += async (model, ea) =>
{
var body = ea.Body.ToString();
// Do something with the to the body...
messages.Add(body);
};
channel.BasicConsume(queue: "myqueue", autoAck: acknowledge, consumer: consumer);
return messages;
}
返回的消息数组似乎总是空的。有人能帮我理解一下为什么吗?
我尝试的另一种方法是在现有的Received处理程序中执行:
public async Task ProcessMessages()
{
var factory = new ConnectionFactory { HostName = "local" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "myqueue");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToString();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "NewQueue1",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
var newMsgBytes[] = new Byte[] // Do some processing of message and send new message to new queue
channel.BasicPublish(exchange: string.Empty,
routingKey: "NewQueue1",
basicProperties: null,
body: newMsgBytes);
};
channel.BasicConsume(queue: "myqueue", autoAck: acknowledge, consumer: consumer);
}
这似乎也不起作用,因为新消息没有发送到队列。有什么建议吗?谢谢
4条答案
按热度按时间yacmzcpb1#
尝试以这种方式读取(而不是
ea.Body.ToString()
,使用ea.Body.ToArray()
)更多详情请点击这里https://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
j2datikz2#
在第一个示例中,您没有向
messages
列表(返回)添加任何内容,只有receivedQMessages
。除非您在其他地方修改messages
,否则这可能就是您在“队列中”看不到任何内容的原因。其次,请注意,如果您将
autoAck
设置为true,则消息将在收到后立即删除。hl0ma9xz3#
你应该将eventArgs主体转换为数组这是简单的消费者:
vsmadaxz4#
在第一种方法中:
Received
事件是异步的,在单独的线程上运行。这意味着代码将在Received
事件完成将消息添加到messages
列表之前继续执行。因此,messages
列表在返回时将始终为空。使用像
ConcurrentQueue<T>
这样的线程安全集合来存储消息。就像这样: