Celery worker与Rabbitmq的连接在gevent或eventlet模式下遇到管道中断错误

wr98u20j  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(1)|浏览(387)

我运行以发布Celery worker与Rabbitmq的连接在Gevent模式下遇到了管道断裂错误。而当Celery worker在进程池模式下工作时没有问题(没有Gevent,没有猴子补丁)。
之后,Celery工作人员将不再从Rabbitmq获得任务消息,直到重新启动。
当Celery工作者消耗任务消息的速度比Django应用程序生成消息的速度慢时,这个问题总是会发生,并且Rabbitmq中堆积了大约3吨的消息。
Geevent版本1.1.0
西芹3.1.22版
=======celery 原木======

[2016-08-08 13:52:06,913: CRITICAL/MainProcess] Couldn't ack 293, reason:error(32, 'Broken pipe')
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/kombu/message.py", line 93, in ack_log_error
    self.ack()
  File "/usr/local/lib/python2.7/site-packages/kombu/message.py", line 88, in ack
    self.channel.basic_ack(self.delivery_tag)
  File "/usr/local/lib/python2.7/site-packages/amqp/channel.py", line 1584, in basic_ack
    self._send_method((60, 80), args)
  File "/usr/local/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, in _send_method
    self.channel_id, method_sig, args, content,
  File "/usr/local/lib/python2.7/site-packages/amqp/method_framing.py", line 221, in write_method
    write_frame(1, channel, payload)
  File "/usr/local/lib/python2.7/site-packages/amqp/transport.py", line 182, in write_frame
    frame_type, channel, size, payload, 0xce,
  File "/usr/local/lib/python2.7/site-packages/gevent/_socket2.py", line 412, in sendall
    timeleft = self.__send_chunk(chunk, flags, timeleft, end)
  File "/usr/local/lib/python2.7/site-packages/gevent/_socket2.py", line 351, in __send_chunk
    data_sent += self.send(chunk, flags)
  File "/usr/local/lib/python2.7/site-packages/gevent/_socket2.py", line 320, in send
    return sock.send(data, flags)
error: [Errno 32] Broken pipe

==============================================================================================================================================================

=ERROR REPORT==== 8-Aug-2016::14:28:33 ===
closing AMQP connection <0.15928.4> (10.26.39.183:60732 -> 10.26.39.183:5672):
{writer,send_failed,{error,enotconn}}

=ERROR REPORT==== 8-Aug-2016::14:29:03 ===
closing AMQP connection <0.15981.4> (10.26.39.183:60736 -> 10.26.39.183:5672):
{writer,send_failed,{error,enotconn}}

=ERROR REPORT==== 8-Aug-2016::14:29:03 ===
closing AMQP connection <0.15955.4> (10.26.39.183:60734 -> 10.26.39.183:5672):
{writer,send_failed,{error,enotconn}}

Celery工作者使用eventlet时也会出现类似的问题。

[2016-08-09 17:41:37,952: CRITICAL/MainProcess] Couldn't ack 583, reason:error(32, 'Broken pipe')
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/kombu/message.py", line 93, in ack_log_error
    self.ack()
  File "/usr/local/lib/python2.7/site-packages/kombu/message.py", line 88, in ack
    self.channel.basic_ack(self.delivery_tag)
  File "/usr/local/lib/python2.7/site-packages/amqp/channel.py", line 1584, in basic_ack
    self._send_method((60, 80), args)
  File "/usr/local/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, in _send_method
    self.channel_id, method_sig, args, content,
  File "/usr/local/lib/python2.7/site-packages/amqp/method_framing.py", line 221, in write_method
    write_frame(1, channel, payload)
  File "/usr/local/lib/python2.7/site-packages/amqp/transport.py", line 182, in write_frame
    frame_type, channel, size, payload, 0xce,
  File "/usr/local/lib/python2.7/site-packages/eventlet/greenio/base.py", line 385, in sendall
    tail = self.send(data, flags)
  File "/usr/local/lib/python2.7/site-packages/eventlet/greenio/base.py", line 379, in send
    return self._send_loop(self.fd.send, data, flags)
  File "/usr/local/lib/python2.7/site-packages/eventlet/greenio/base.py", line 366, in _send_loop
    return send_method(data, *args)
error: [Errno 32] Broken pipe

添加安装和负载测试信息
我们使用supervisor启动celery ,并提供以下选项

celery worker -A celerytasks.celery_worker_init -Q default -P gevent -c 1000 --loglevel=info

Celery使用Rabbitmq作为代理。
通过在管理程序配置中指定“numprocs=4”,我们有4个Celery工作进程。
我们使用jmeter来模拟Web访问负载,Django应用程序将产生任务供Celery工作者使用。这些任务基本上需要访问Mysql DB来获取/更新一些数据。
从rabbitmq网站管理页面,任务产生的速度是50/秒,而消耗的速度是20/秒.经过大约1分钟的负载测试,日志文件显示许多连接之间的Rabbitmq和celery 遇到了断管错误

iezvtpos

iezvtpos1#

我们注意到,这个问题也是由于高prefect计数和高并发性的组合而引起的。
我们将并发设置为500,预取设置为100,这意味着每个工作进程的最终预取为500*100= 50,000。我们有大约100k个任务堆积,由于这种配置,一个工作进程为自己保留了所有任务,其他工作进程甚至没有使用。这一个工作线程不断收到Broken pipe错误,并且从未确认任何任务,这导致任务从未从队列中清除。
然后,我们将预取更改为3,并重新启动所有工作进程,从而解决了该问题。将预取更改为更低的数值后,我们没有看到管道断裂错误的示例,因为在此之前我们经常看到该错误。

相关问题