我昨天才开始学习RabbitMQ,我不明白它是如何实现的。
这是我的制作人
public class RabbitMQProducerService: IMessageSender
{
public void SendMessage<T>(T message)
{
var factory = new ConnectionFactory { HostName = "localhost" };
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
channel.QueueDeclare("Input", exclusive: false);
var json = JsonConvert.SerializeObject(message);
var body = Encoding.UTF8.GetBytes(json);
channel.BasicPublish(exchange: "", routingKey: "Input", basicProperties: null, body: body);
}
}
我的订户
string exepction = "";
StatisticOfTest statistic;
var factory = new ConnectionFactory
{
HostName = "localhost"
};
var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare("Input", exclusive: false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, eventArgs) =>
{
SolutionService solution = new SolutionService();
var body = eventArgs.Body.ToArray();
var jsonStr = Encoding.UTF8.GetString(body);
SolutionAddModel solutionAdd = JsonConvert.DeserializeObject<SolutionAddModel>(jsonStr);
if (String.IsNullOrEmpty(solution.Validating(solutionAdd)))
{
statistic = solution.Create(solutionAdd); //I want to return statistic
}
else
{
exepction = solution.Validating(solutionAdd); // Or Exception
}
Console.WriteLine($"Message received: {jsonStr}");
};
channel.BasicConsume(queue: "Input", autoAck: true, consumer: consumer);
Console.ReadKey();
根据结果,我想返回统计信息或异常。但是到另一个队列,例如,到某个输出队列
1条答案
按热度按时间flseospp1#
consumer.Received += (model, eventArgs) => { ... }
...
是你声明当收到新消息时会发生什么的地方,那里的代码是Received
事件的事件处理程序。在您的例子中,您希望每个新消息都在某种条件下发布到队列中。这很容易做到,只需在事件处理程序代码中发布一条消息即可:
此外,查看您的生产者代码:你不需要在每次发送消息时都创建一个新的连接、一个新的通道和声明一个队列。你可以提前做一次。这将真正节省RabbitMQ的资源。
RabbitMQ也有一个连接限制,所以在将来的某个时候你的程序可能会停止工作。如果一个应用程序实际上对所有事情都使用一个连接会更好,如果可能的话,只是在需要的地方产生通道。
另外,不要忘记,IConnection和IModel实现的是IDisposable,因此您应该正确地处理它们。
您应该这样做: