我们试图建立一个基本的定向队列系统,其中生产者将生成多个任务,一个或多个消费者将一次获取一个任务,处理它,并确认消息。
问题是,处理过程可能需要10-20分钟,而我们当时没有响应消息,导致服务器断开我们的连接。
下面是一些针对消费者的伪代码:
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
long_running_task(connection)
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
在第一个任务完成后,在BlockingConnection内部深处的某个地方抛出一个异常,抱怨套接字被重置。此外,RabbitMQ日志显示消费者由于没有及时响应而断开连接(为什么它重置连接而不是发送FIN很奇怪,但我们不会担心这个)。
我们搜索了很多,因为我们认为这是RabbitMQ的正常用例(有很多长时间运行的任务,应该在许多消费者之间分配),但似乎没有其他人真正遇到这个问题。最后,我们偶然发现了一个建议使用heartbeats并在单独线程中生成long_running_task()
的线程。
所以代码变成了:
#!/usr/bin/env python
import pika
import time
import threading
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost',
heartbeat_interval=20))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
def thread_func(ch, method, body):
long_running_task(connection)
ch.basic_ack(delivery_tag = method.delivery_tag)
def callback(ch, method, properties, body):
threading.Thread(target=thread_func, args=(ch, method, body)).start()
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
这看起来可行,但很麻烦我们确定ch
对象是线程安全的吗?此外,假设long_running_task()
正在使用该连接参数将任务添加到新队列(即这个漫长过程的第一部分已经完成,让我们把任务发送到第二部分)。因此,线程使用connection
对象。这条线安全吗?
更重要的是,做这件事的首选方法是什么?我觉得这是非常混乱的,可能不是线程安全的,所以也许我们没有这样做的权利。谢谢你,谢谢
7条答案
按热度按时间eulz3vhy1#
现在,你最好的办法是关闭heartbeats,这将防止RabbitMQ关闭连接,如果你阻塞太长时间。我正在试验pika的核心连接管理和在后台线程中运行的IO循环,但它还不够稳定,无法发布。
在pika v1.1.0中,这是
ConnectionParameters(heartbeat=0)
4uqofj5v2#
请不要禁用心跳!
从Pika
0.12.0
开始,请使用this example code中描述的技术在单独的线程上运行长时间运行的任务,然后确认来自该线程的消息。c6ubokkw3#
我遇到了和你一样的问题。
我的解决方案是:
1.在服务器端关闭心跳
1.评估任务可能花费的最长时间
1.将客户端心跳超时设置为从步骤2获得的时间
为什么?
正如我用以下案例测试的那样:
案例一
1.服务器心跳打开,19世纪
1.客户端未设置
我仍然得到错误时,任务运行了很长时间-- >1800
案例二
1.关闭服务器心跳
1.关闭客户端检测信号
客户端没有错误,除了一个问题--当客户端崩溃时(我的操作系统在某些故障下重新启动),在Rabbitmq管理插件上仍然可以看到tcp连接。这是令人困惑。
案三
1.关闭服务器心跳
1.打开客户端心跳,将其设置为可预见的最大运行时间
在这种情况下,我可以动态地改变每个客户端上的每一个心跳。事实上,我在经常崩溃的机器上设置了心跳。而且,我可以通过Rabbitmq管理插件看到离线机器。
环境
操作系统:Centos x86_64
鼠兔:0.9.13
rabbitmq:3.3.1
yv5phkfx4#
1.您可以在
long_running_task(connection)
中定期调用connection.process_data_events()
,该函数将在调用时向服务器发送heartbeat,并使pika客户端远离关闭。1.在pika
BlockingConnection
中将心跳值设置为大于callconnection.process_data_events()
周期。bvhaajcl5#
不要禁用心跳。
最好的解决方案是在一个单独的线程中运行该任务,并将
prefetch_count
设置为1
,这样使用者只会收到一条未确认的消息,类似于channel.basic_qos(prefetch_count=1)
hsvhsicv6#
您还可以设置一个新线程,在这个新线程中处理消息,并在此线程处于活动状态时在连接上调用
.sleep
,以防止丢失心跳。下面是一个取自github中@gmr的示例代码块,以及一个指向该问题的链接以供将来参考。网址:https://github.com/pika/pika/issues/930#issuecomment-360333837
nnsrf1az7#
这里有一个更简单的方法来处理这个线程。如果消费者应用程序在当前作业完成之前不应消费另一个作业,则特别有用。ACK可以在任何时候发送-在本例中,我选择仅在作业完成时(线程不再活动)发送它。
在自己的线程中启动长时间运行的进程,然后通过调用channel.process_data_events()在循环中监视该线程。在主线程中保留对连接对象的引用,因为它不是线程安全的。基本上: