C#,RabbitMq如何发送回处理后的消息

mcdcgff0  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(1)|浏览(274)

我昨天才开始学习RabbitMQ,我不明白它是如何实现的。
这是我的制作人

public class RabbitMQProducerService: IMessageSender
    {
        public void SendMessage<T>(T message)
        {
            var factory = new ConnectionFactory { HostName = "localhost" };
            var connection = factory.CreateConnection();

            var channel = connection.CreateModel(); 
            channel.QueueDeclare("Input", exclusive: false);

            var json = JsonConvert.SerializeObject(message);
            var body = Encoding.UTF8.GetBytes(json);
            channel.BasicPublish(exchange: "", routingKey: "Input", basicProperties: null, body: body);
        }
    }

我的订户

string exepction = "";
StatisticOfTest statistic;

var factory = new ConnectionFactory
{
    HostName = "localhost"
};
var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare("Input", exclusive: false);

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, eventArgs) =>
{
    SolutionService solution = new SolutionService();
    var body = eventArgs.Body.ToArray();
    var jsonStr = Encoding.UTF8.GetString(body);
    SolutionAddModel solutionAdd = JsonConvert.DeserializeObject<SolutionAddModel>(jsonStr);

    if (String.IsNullOrEmpty(solution.Validating(solutionAdd)))
    {
        statistic = solution.Create(solutionAdd); //I want to return statistic
    }
    else
    {
        exepction = solution.Validating(solutionAdd); // Or Exception
    }
    Console.WriteLine($"Message received: {jsonStr}");
};
channel.BasicConsume(queue: "Input", autoAck: true, consumer: consumer);
Console.ReadKey();

根据结果,我想返回统计信息或异常。但是到另一个队列,例如,到某个输出队列

flseospp

flseospp1#

consumer.Received += (model, eventArgs) => { ... }
...是你声明当收到新消息时会发生什么的地方,那里的代码是Received事件的事件处理程序。
在您的例子中,您希望每个新消息都在某种条件下发布到队列中。这很容易做到,只需在事件处理程序代码中发布一条消息即可:

...
// Create a separate channel for sending messages to the output queue
var outputChannel = connection.CreateModel();

// Declare the output queue, you want to send messages to
outputChannel.QueueDeclare("Output", exclusive: false);

// Specify how the actual processing is going to happen
consumer.Received += (model, eventArgs) =>
{
    var body = GetMessageBody(eventArgs.Body); // This is where you get new message content based on the received message.

    outputChannel.BasicPublish(exchange: "", routingKey: "Input", basicProperties: null, body: body);
};
...

此外,查看您的生产者代码:你不需要在每次发送消息时都创建一个新的连接、一个新的通道和声明一个队列。你可以提前做一次。这将真正节省RabbitMQ的资源。
RabbitMQ也有一个连接限制,所以在将来的某个时候你的程序可能会停止工作。如果一个应用程序实际上对所有事情都使用一个连接会更好,如果可能的话,只是在需要的地方产生通道。
另外,不要忘记,IConnection和IModel实现的是IDisposable,因此您应该正确地处理它们。
您应该这样做:

// 'using' tells the compiler to generate a Dispose call for this object at the end of the current block.
using var connection = connectionFactory.CreateConnection();
uisng var channel = connection.CreateModel();
channel.QueueDeclare("Input", exclusive: false);
for (var i = 0; i < 100l i++)
    producer.SendMessage(i);

相关问题