接口错误:连接已经关闭(使用django + celery + Scrapy)

ig9co6j1  于 2022-11-09  发布在  Go
关注(0)|答案(3)|浏览(228)

我在Celery任务中使用Scrapy解析函数(有时需要10分钟)时得到了这个结果。
我用途:- Django==1.6.5 -Django-celery ==3.1.16 -celery ==3.1.16 -psycopg 2 ==2.5.5(我也使用了psycopg 2 ==2.5.4)

[2015-07-19 11:27:49,488: CRITICAL/MainProcess] Task myapp.parse_items[63fc40eb-c0d6-46f4-a64e-acce8301d29a] INTERNAL ERROR: InterfaceError('connection already closed',)
Traceback (most recent call last):
  File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/celery/app/trace.py", line 284, in trace_task
    uuid, retval, SUCCESS, request=task_request,
  File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/celery/backends/base.py", line 248, in store_result
    request=request,**kwargs)
  File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/djcelery/backends/database.py", line 29, in _store_result
    traceback=traceback, children=self.current_task_children(request),
  File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/djcelery/managers.py", line 42, in _inner
    return fun(*args,**kwargs)
  File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/djcelery/managers.py", line 181, in store_result
    'meta': {'children': children}})
  File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/djcelery/managers.py", line 87, in update_or_create
    return get_queryset(self).update_or_create(**kwargs)
  File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/djcelery/managers.py", line 70, in update_or_create
    obj, created = self.get_or_create(**kwargs)
  File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/query.py", line 376, in get_or_create
    return self.get(**lookup), False
  File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/query.py", line 304, in get
    num = len(clone)
  File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/query.py", line 77, in __len__
    self._fetch_all()
  File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/query.py", line 857, in _fetch_all
    self._result_cache = list(self.iterator())
  File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/query.py", line 220, in iterator
    for row in compiler.results_iter():
  File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/sql/compiler.py", line 713, in results_iter
    for rows in self.execute_sql(MULTI):
  File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/sql/compiler.py", line 785, in execute_sql
    cursor = self.connection.cursor()
  File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/backends/__init__.py", line 160, in cursor
    cursor = self.make_debug_cursor(self._cursor())
  File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/backends/__init__.py", line 134, in _cursor
    return self.create_cursor()
  File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/utils.py", line 99, in __exit__
    six.reraise(dj_exc_type, dj_exc_value, traceback)
  File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/backends/__init__.py", line 134, in _cursor
    return self.create_cursor()
  File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/backends/postgresql_psycopg2/base.py", line 137, in create_cursor
    cursor = self.connection.cursor()
InterfaceError: connection already closed
jk9hmnmh

jk9hmnmh1#

不幸的是,这是django + psycopg2 +celery 组合的一个问题。这是一个老问题,没有解决。
请查看此主题以了解:https://github.com/celery/django-celery/issues/121
基本上,当celery启动一个worker时,它会从django.db框架中派生一个数据库连接。如果这个连接由于某种原因而断开,它不会创建一个新的连接。Celery与这个问题无关,因为使用django.db库无法检测到数据库连接何时断开。Django不会在发生这种情况时发出通知,因为它只是启动一个连接,它接收到一个wsgi调用(没有连接池)。我在一个有很多机器工人的巨大生产环境中遇到了同样的问题,有时,这些机器失去了与postgres服务器的连接。
我解决了这个问题,将每个celery 主进程置于一个linux supervisord处理程序和一个监视器之下,并实现了一个处理psycopg2.InterfaceError的装饰器,当它发生时,此函数将调度一个系统调用,以强制管理程序使用SIGINT正常重启celery 进程。
编辑:
找到了一个更好的解决方案。我实现了一个celery 任务基类,如下所示:

from django.db import connection
import celery

class FaultTolerantTask(celery.Task):
    """ Implements after return hook to close the invalid connection.
    This way, django is forced to serve a new connection for the next
    task.
    """
    abstract = True

    def after_return(self, *args,**kwargs):
        connection.close()

@celery.task(base=FaultTolerantTask)
def my_task():
    # my database dependent code here

我相信它也会解决你的问题。

blpfk2vs

blpfk2vs2#

伙计们,
我遇到了同样的问题,现在我已经更新了我的代码,并为celery 创建了一个新的加载器:

from djcelery.loaders import DjangoLoader
from django import db

class CustomDjangoLoader(DjangoLoader):
    def on_task_init(self, task_id, task):
        """Called before every task."""
        for conn in db.connections.all():
            conn.close_if_unusable_or_obsolete()
        super(CustomDjangoLoader, self).on_task_init(task_id, task)

当然,如果你使用的是djcelery,它也需要在设置中做如下设置:

CELERY_LOADER = 'myproject.loaders.CustomDjangoLoader'
os.environ['CELERY_LOADER'] = CELERY_LOADER

我还得测试一下,我会更新的。

cgfeq70w

cgfeq70w3#

如果您在运行测试时遇到这种情况,那么您可以将测试更改为TransactionTestCase类而不是TestCase类,或者添加标记pytest.mark.django_db(transaction=True)。这使我的数据库连接从创建pytest-celery fixture到数据库调用都保持活动状态。
Github问题-https://github.com/Koed00/django-q/issues/167
对于上下文,我在我的测试中使用了带有celery_appcelery_worker的pytest-celery作为fixture,我还尝试在这些测试中引用的任务中命中测试数据库。
如果有人能解释一下切换到transaction=True使其保持打开状态,那就太好了!

相关问题