在CPU长时间100%加载时处理RabbitMQ心跳

carvr3hs  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(1)|浏览(188)

我在我的python应用程序中使用pika 1.1和graph-tool 3.4,它使用RabbitMQ的任务,然后用graph-tool构建图形,然后运行一些计算。
有些计算,比如betweenness,会占用大量的cpu资源,使得cpu的使用率在很长一段时间内达到100%。有时rabbitmq连接会掉下来,这会导致任务从头开始。
即使计算是在一个单独的进程中运行的,我的猜测是在cpu负载100%的时候,它找不到任何机会向rabbitmq发送心跳,这导致连接终止。这种情况并不总是发生,这表明它偶尔会发送心跳。这只是我的猜测,我不确定还有什么原因会导致这种情况。
我尝试使用nice(19)降低计算过程的优先级,但没有成功。我假设它不会影响graph-tool产生的过程,graph-tool会自行并行化工作。
因为它只有一行代码,graph.calculate_betweenness(...,所以我没有地方手动发送心跳或减慢执行速度来为心跳创造机会。
1.我猜测的因为cpu超级繁忙而没有发送心跳是正确的吗?
1.如果是,我该如何处理这种情况?

2hh7jdfx

2hh7jdfx1#

回答您的问题:
1.是的,基本上就是这样。
1.我们的解决方案是为CPU密集型任务创建一个单独的进程。

import time
from multiprocessing import Process

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)

def cpu_intensive_task(ch, method, properties, body):

    def work(body):
        time.sleep(60)  # If I remember well default HB is 30 seconds
        print(" [x] %r" % body)

    p = Process(target=work, args=(body,))
    p.start()
    # Important to notice if you do p.join() You will have the same problem.

channel.basic_consume(
    queue=queue_name, on_message_callback=cpu_intensive_task, auto_ack=True)

channel.start_consuming()

我想知道这是否是这个问题的最佳解决方案,或者rabbitMQ是否是处理CPU密集型任务的最佳工具。(对于非常长的CPU密集型任务(超过30分钟),如果您发送手动ACK,您还需要处理以下问题:https://www.rabbitmq.com/consumers.html#acknowledgement-timeout)

相关问题