我在另一个程序中做了一个生产者,它把数据推送到RabbitMQ队列中。它推类似json的格式,然后把它转换成序列化的对象。
var factory = new ConnectionFactory { HostName= "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(
queue: "consumption",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
Guid id = Guid.NewGuid();
Guid deviceId = new("43215a86-47af-4ccb-3832-08dabf2b75a9");
DateTime timestamp = DateTime.UtcNow;
var message = new
{
id = id,
deviceId = deviceId,
timestamp = timestamp,
energy_consumption = 1
};
var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
channel.BasicPublish("", "consumption", null, body);
当我尝试使用我的.net 6. 0 web api作为消费者读取队列的内容时-它什么也不返回,我不能理解为什么。
[HttpPost]
public void RabbitMQConsumer(Timestamp timestampRequest)
{
var factory = new ConnectionFactory
{
HostName = "localhost"
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
channel.QueueDeclare(
queue: "consumption",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null
);
var consumer = new EventingBasicConsumer(channel);
byte[] body;
string message = null;
consumer.Received += (model, ea) =>
{
body = ea.Body.ToArray();
message = Encoding.UTF8.GetString(body);
};
//using debugger
//body -> null
//message - > null
if (message != null)
{
UpdateTimestamps(message);
}
channel.BasicConsume(queue: "consumption", autoAck: true, consumer: consumer);
}
'UpdateTimestamps()还没有完成,但它是一个空方法。我试着用调试器看看'body'和'message'在消费者之后是什么样子。收到了,但它们仍然是空的。如何解决这个问题?
我希望body或message能够成功地检索从生成器以任何形式发送的所有数据,因为即使它不是json格式,我也可以在之后解析消息。
1条答案
按热度按时间cyej8jka1#
在RabbitMQ中,队列的消费是异步的。在您的例子中,
RabbitMQConsumer
方法在Received
回调有机会做任何事情之前退出。您必须在.NET 6 Web API应用程序中设置一个长时间运行的使用者(很可能是在应用程序启动期间)。然后,当消息到达时,将其存储起来,以便在调用
RabbitMQConsumer
时读取。这是一个非常有趣的场景,如果你想通过git托管服务分享你的完整代码,我可以查看并给予反馈。我必须能够克隆、编译和运行你的项目。我是RabbitMQ .NET客户端的主要维护者之一。
**注意:**RabbitMQ团队监控
rabbitmq-users
邮件列表,仅在某些时候回答StackOverflow上的问题。