按需浏览RabbitMQ队列,并在之后处理数据

vkc1a9a2  于 2023-05-06  发布在  RabbitMQ
关注(0)|答案(1)|浏览(203)

bounty已结束。此问题的答案有资格获得+200声望奖励。赏金宽限期1小时后结束。PKCS12想要引起更多关注这个问题:需要帮助找到稳定、可靠的解决方案。可能是一个完整的重构

尝试读取队列。它在队列中有4条消息,每条消息都很小-就像“hello world”一样小!奇怪的是,有时它很慢,有时它很快,我不知道为什么。我猜这不是一个浏览队列的好方法,但对于我的目的,我不能以正常的方式使用队列-我需要按需获取队列的内容,然后处理它。谁能帮帮我?

public async Task<new List<string>()>> GetMessages()
{
    var factory = new ConnectionFactory { HostName = "localhost"};
    using var connection = factory.CreateConnection();
    using var channel = connection.CreateModel();

    var queueDeclareResponse = channel.QueueDeclare("myQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);

    var receivedMessages = new List<string>();
    var tcs = new TaskCompletionSource<List<string>>();

    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var message = Encoding.UTF8.GetString(ea.Body.ToArray());

        receivedMessages.Add(message);

        if (receivedMessages.Count == queueDeclareResponse.MessageCount)
        {
            tcs.SetResult(receivedMessages);
        }
    };

    channel.BasicConsume(queueName, autoAck: false, consumer: consumer);

    // Wait for the consumer to finish processing messages
    var result = await tcs.Task;

    return result; // RETURN TO CALLER TO DO STUFF WITH LIST OF MSGS...
}
ergxz8rk

ergxz8rk1#

我会这样做:

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;

namespace ConsoleApp1
{
    public class Program
    {
        public static async Task Main()
        {
            var factory = new ConnectionFactory { HostName = "localhost"};
            using var connection = factory.CreateConnection();

            var all = await GetAllMessagesAsync(connection, "myQueue", CancellationToken.None);
        }

        private static async Task<List<string>> GetAllMessagesAsync(IConnection connection, string queueName, CancellationToken ct)
        {
            await Task.Yield();
            using var model = connection.CreateModel();
            model.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
            model.BasicQos(0, 1, false);
            var result = new List<string>();
            while (true)
            {
                try
                {
                    var tmp = await GetNextMessageAsync(model, queueName, ct, 
                        TimeSpan.FromSeconds(1),
                        TimeSpan.FromMilliseconds(100));
                    if (tmp == null)
                        break;

                    var body = Encoding.UTF8.GetString(tmp.Body.Span);
                    result.Add(body);
                }
                catch (OperationCanceledException) when (ct.IsCancellationRequested)
                {
                    throw;
                }
            }
            return result;
        }

        private static async Task<BasicGetResult> GetNextMessageAsync(IModel model, string queueName, CancellationToken ct, TimeSpan timeout, TimeSpan checkInterval)
        {
            ct.ThrowIfCancellationRequested();
            
            using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
            cts.CancelAfter(timeout);
            while (true)
            {
                var tmp = model.BasicGet(queueName, false);
                if (tmp != null)
                    return tmp;

                try
                {
                    await Task.Delay(checkInterval, cts.Token).ConfigureAwait(false);
                }
                catch (OperationCanceledException)
                {
                    return model.BasicGet(queueName, false);
                }
            }
        }
    }
}

队列一般不会被设计成查看消息的“数据库”,RabbitMQ -就是其中一种情况,它不是设计来拉取消息的,而且basic.get效率不高(它不能让你批量获取消息),但对于UI/小队列应该没问题。

PS

从我在各种队列中的经验来看,RabbitMQ是其中最差的

  • 它在框架和服务器配置方面过于复杂,占用大量资源,难以管理。
  • 可以split-brain occasionally(这在实践中会导致所有/部分消息丢失),难以进行手动恢复。
  • 内存溢出在两端也很常见,需要持续监控
  • 客户是有状态的,有时他们可以到处闲逛。不重新启动服务就无法恢复。需要通过运行状况检查进行持续监测。
  • 你可以忘记优雅关机,客户端挂在后台。事件更糟-在终结器部分。
  • 具有大量消费者/生产者的队列导致连接流失。
    尽量使用Kafka
  • 它很简单(它实际上只是一堆日志文件和带有偏移量的消费者)
  • 提供相同的性能(在三个节点上超过10- 100 k rps是很容易的)
  • 可以做RabbitMQ可以做的一切以及经纪人应该做的事情。
  • 允许您以任何方式(通过水印)查看队列,许多云提供商默认将其作为其资产之一。
  • 具有第三方UI来查看/管理主题/分区/消息

这里的对比非常明显:https://www.projectpro.io/article/kafka-vs-rabbitmq/451

相关问题