如何关闭连接在celery 避免'客户端意外关闭TCP连接'警告在RabbitMQ

ygya80vv  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(1)|浏览(170)

当运行以下简单的celery任务时,我总是在RabbitMQ日志输出中收到“客户端意外关闭TCP连接”警告。

from celery import Celery
import datetime, time
app = Celery('tasks',
            broker='amqp://guest@localhost/my_vhost'
            )

@app.task(ignore_result=True)
def foobar(name):
    with open('mylog.log', 'a') as logfile:
        try:
            nowstr = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
            logfile.write('[%s] start\n' % nowstr)
            logfile.flush()
            time.sleep(5)
            logfile.write(name + '\n')
            logfile.write('ended\n')
        except Exception as e:
            logfile.write('Exception: %s' % e)
        logfile.flush()

if __name__ == '__main__':
    foobar.delay('foobar')

我在一个docker容器中运行RabbitMQ,我从下面的docker-compose开始。

version: '2'
services:
  rabbitmq:
    image: rabbitmq
    environment:
      - RABBITMQ_DEFAULT_VHOST=my_vhost
    ports: 
      - "5672:5672"

我开始的那个工人

celery -f celery.log -A sandbox worker --concurrency 1

如何正确关闭连接以避免这些警告?

ryoqjall

ryoqjall1#

我假设问题是写入文件是一个阻塞操作,这导致了这个问题,因为RabbitMQ会轮询容器的状态。

from celery import Celery
import multiprocessing
import datetime, time

app = Celery('tasks', broker='amqp://guest@localhost/my_vhost')

def _write_logs(name):
    with open('mylog.log', 'a') as logfile:
        try:
            nowstr = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
            logfile.write('[%s] start\n' % nowstr)
            logfile.flush()
            time.sleep(5)
            logfile.write(name + '\n')
            logfile.write('ended\n')
        except Exception as e:
            logfile.write('Exception: %s' % e)
        logfile.flush()

@app.task(ignore_result=True)
def foobar(name):
    multiprocessing.Process(target=_write_logs, args=(name, )).start()

if __name__ == '__main__':
    foobar.delay('foobar')

编辑:如果上面的解决方案没有帮助你,很可能你面临着内存泄漏问题。

相关问题