rabbitmq 客户端消费并发送到另一个队列

x4shl7ld  于 2023-04-30  发布在  RabbitMQ
关注(0)|答案(4)|浏览(260)

bounty 18小时后到期。回答此问题可获得+50声望奖励。PKCS12希望引起更多关注这个问题。

我正在尝试使用RabbitMQ在C#中使用RabbitMQ消息。客户端库并获取消息列表,并在返回数组之前将其添加到数组中。在此之后,我希望处理这个数组并将其发送到一个新队列。所以我的第一个方法如下:

public async Task<List<string>> ProcessMessages() 
{
var factory = new ConnectionFactory { HostName = "local"};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

channel.QueueDeclare(queue: "myqueue");

var consumer = new EventingBasicConsumer(channel);

var messages= new List<string>();

consumer.Received += async (model, ea) =>
{
    var body = ea.Body.ToString();

    // Do something with the to the body...

    messages.Add(body);

};

channel.BasicConsume(queue: "myqueue", autoAck: acknowledge, consumer: consumer);

return messages;
}

返回的消息数组似乎总是空的。有人能帮我理解一下为什么吗?
我尝试的另一种方法是在现有的Received处理程序中执行:

public async Task ProcessMessages()
{
    var factory = new ConnectionFactory { HostName = "local" };
    using var connection = factory.CreateConnection();
    using var channel = connection.CreateModel();

    channel.QueueDeclare(queue: "myqueue");

    var consumer = new EventingBasicConsumer(channel);

    consumer.Received += (model, ea) =>
    {
        var body = ea.Body.ToString();
        using var channel = connection.CreateModel();

        channel.QueueDeclare(queue: "NewQueue1",
                                durable: true,
                                exclusive: false,
                                autoDelete: false,
                                arguments: null);

        var newMsgBytes[] = new Byte[] // Do some processing of message and send new  message to new queue

            channel.BasicPublish(exchange: string.Empty,
                    routingKey: "NewQueue1",
                    basicProperties: null,
                    body: newMsgBytes);

    };

    channel.BasicConsume(queue: "myqueue", autoAck: acknowledge, consumer: consumer);

}

这似乎也不起作用,因为新消息没有发送到队列。有什么建议吗?谢谢

yacmzcpb

yacmzcpb1#

尝试以这种方式读取(而不是ea.Body.ToString(),使用ea.Body.ToArray()

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine($" [x] Received {message}");
};

更多详情请点击这里https://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

j2datikz

j2datikz2#

在第一个示例中,您没有向messages列表(返回)添加任何内容,只有receivedQMessages。除非您在其他地方修改messages,否则这可能就是您在“队列中”看不到任何内容的原因。
其次,请注意,如果您将autoAck设置为true,则消息将在收到后立即删除。

hl0ma9xz

hl0ma9xz3#

你应该将eventArgs主体转换为数组这是简单的消费者:

var factory = new ConnectionFactory {
HostName = "localhost"
};
//Create the RabbitMQ connection using connection factory details 
var connection = factory.CreateConnection();

var channel = connection.CreateModel();
//declare the queue after mentioning name and a few property related to that
channel.QueueDeclare("myqueue", exclusive: false);
//Set Event object which listen message from chanel which is sent by producer
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, eventArgs) => {
    var body = eventArgs.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine($ "myqueue message received: {message}");
};
//read the message
channel.BasicConsume(queue: "myqueue", autoAck: true, consumer: consumer);
vsmadaxz

vsmadaxz4#

在第一种方法中:
Received事件是异步的,在单独的线程上运行。这意味着代码将在Received事件完成将消息添加到messages列表之前继续执行。因此,messages列表在返回时将始终为空。
使用像ConcurrentQueue<T>这样的线程安全集合来存储消息。
就像这样:

public async Task<List<string>> ProcessMessages() 
{
    var factory = new ConnectionFactory { HostName = "local" };
    using var connection = factory.CreateConnection();
    using var channel = connection.CreateModel();

    channel.QueueDeclare(queue: "myqueue");

    var messages = new ConcurrentQueue<string>();

    var consumer = new EventingBasicConsumer(channel);

    consumer.Received += (model, ea) =>
    {
        var body = ea.Body.ToArray();
        var message = Encoding.UTF8.GetString(body);

        // Do something with the message...

        messages.Enqueue(message);
    };

    channel.BasicConsume(queue: "myqueue", autoAck: acknowledge, consumer: consumer);

    return messages.ToList();
}

相关问题