Nodejs RabbitMQ服务器上的高性能

c0vxltue  于 2023-08-05  发布在  RabbitMQ
关注(0)|答案(2)|浏览(125)

我正在建立一个分析系统,有一百万用户同时在线。我使用RabbitMQ如消息代理来减少服务器的容量
这是我的图表
x1c 0d1x的数据
我的系统包括3个组件。

**发布服务器:(Producer)**本系统基于nodejs构建。本系统的目的是将消息发布到queue
RabbitMQ队列:此系统存储publisher server发送到的消息。之后,打开一个连接以从subscriber server的队列发送消息。
用户服务器(消费者):该系统接收来自queue的消息
发布服务器源代码

var amqp = require('amqplib/callback_api');
amqp.connect("amqp://localhost", function(error, connect) {
    if (error) {
        return callback(-1, null);
    } else {
       connect.createChannel(function(error, channel) {
       if (error) {
           return callback(-3, null);
       } else {
         var q = 'logs';
         var msg = data; // object
         // convert msg object to buffer 
         var new_msg = Buffer.from(JSON.stringify(msg), 'binary');

        channel.assertExchange(q, 'fanout', { durable: false });
        channel.publish(q, 'message_queues', new Buffer(new_msg));
       console.log(" [x] Sent %s", new_msg);
        return callback(null, msg);
      }
    });
   }
});

字符串
创建"message_queues""fanout"的独占交换以向所有消费者发送广播

用户服务器源代码

var amqp = require('amqplib/callback_api');
amqp.connect("amqp://localhost", function(error, connect) {
    if (error) {
        console.log('111');
    } else {
        connect.createChannel(function(error, channel) {
            if (error) {
                console.log('1');
            } else {
                var ex = 'logs';

                channel.assertExchange(ex, 'fanout', { durable: false });
                channel.assertQueue('message_queues', { exclusive: true }, function(err, q) {
                    if (err) {
                        console.log('123');
                    } else {
                        console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue);
                        channel.bindQueue(q.queue, ex, 'message_queues');

                        channel.consume(q.queue, function(msg) {
                            console.log(" [x] %s", msg.content.toString());

                        }, { noAck: true });

                    }
                });
            }
        });
    }

});


"message_queues"交换机接收消息
当我实现发送消息时。系统运行良好,但是我尝试了这个系统的基准测试性能(每秒发送约1000个用户请求),然后系统出现了一些问题。系统似乎是超载/缓冲区溢出(或一些东西不工作)。
我两天前才读到rabbitmq。我知道它的教程是基本的例子,所以我需要帮助,以建立在真实的世界中的系统比。任何解决方案和建议
希望我的问题有意义

x8diyxa7

x8diyxa71#

你的问题很笼统。也许你应该提供更多的细节来帮助识别瓶颈并帮助你。所以,首先我认为你应该检查兔子的智商--它是否是一个瓶颈。有很多事情可能会出错:
1.可以消费消息的消费者数量太少(我假设您使用了一个消费者池)
1.网络太慢了
1.队列和消息在RabbitMQ和GoDoDisk的太多节点之间复制(可以像这样使用RabbitMQ)
1.消费者不能真正处理消息,它会不断地重新排队
所以,一般来说,在你的测试中,你应该检查兔子的mq,看看那里发生了什么。
一旦到达队列的消息处于Ready* 状态一旦发生这种情况,它将一直存在,直到连接到队列的消费者之一不会尝试将消息处理
当其中一个消费者(rabbit在它们之间进行循环)选择要处理的消息时,如果消费者未能处理该消息,它的状态将变为
Unacknowledged**,它将被rabbit重新排队,以便另一个消费者有机会处理该消息。
当然,如果消费者成功处理了消息,消息将从Rabbit mq服务器上消失。
假设你已经安装了rabbit mq web ui(我强烈推荐它,尤其是对初学者)你可以直观地看到你的队列中发生了什么你会看到有多少消息处于就绪状态,有多少消息未被确认。这将有助于识别瓶颈。
例如,如果您看到只有一条消息通常处于未确认状态,这可能意味着消费者无法处理该消息并将其发送回rabbit。另一方面,新消息总是从生产者到达,因此“就绪”消息的数量将增加得非常快,这也可以指出这样一个事实,即您只使用一个消费者,一次只能处理一条消息。因此,您可以考虑在这里进行并行处理,在不同的线程中运行多个消费者,甚至将您的应用程序集群化(在Rabbit中,消费者可以驻留在不同的机器上)
当然,我希望这对你有帮助,就像我之前说的,如果你有更具体的问题,请提供更多关于测试过程中发生的事情的信息

f4t66c6m

f4t66c6m2#

队列可能已经开始将其内容分页到磁盘-如果是这样,检索它们的速度会变慢。您可以查看管理插件中的队列详细信息页面,看看是否是这种情况。
有关某些配置,请参见http://www.rabbitmq.com/memory.html#memsup-paging。
参见:http://www.rabbitmq.com/blog/2011/10/27/performance-of-queues-when-less-is-more/
通常RabbitMQ队列更快,消息更少,原因在那里解释。
除此之外,该博客文章解释了如何配置基本的.qos设置以减少延迟。

相关问题