监控Django中的celery任务

bjg7j2ky  于 2023-04-22  发布在  Go
关注(0)|答案(1)|浏览(122)

我在我的Django应用程序中使用celery-resultscelery-beats。我在我的前端使用React框架,我想创建一个组件来监视所有celery任务中的所有状态,但celery-beats只显示状态:STARTED,FAILURE,SUCCESS.我想要的是在我的数据库中显示所有可能的状态.我正在使用Flower,它工作正常,但我只需要将所有状态保存到我的PostgreSQL数据库中.

问题

最好的方法是什么?我知道TaskResults只是存储已完成的任务,但如何扩展此库以将所有状态存储到我的数据库?我配置了celery以显示STARTED状态,但这还不够。我需要所有可能的状态,特别是RECEIVED。

a14dhokn

a14dhokn1#

以下是如何在Django中监控celery 任务的执行并将其状态存储在数据库中。
首先,你需要一个合适的模型来存储任务的执行。让我们称之为任务日志:

class TaskLog(models.Model)
    ...

然后,您可以使用celery signals(不同于Django信号)在执行新任务或达到特定状态时运行代码,并相应地更新数据库。
例如,您可以使用信号task_successfultask_failuretask_retry将任务执行的完成及其各自的结束状态注册为新的TaskLog对象。

from celery import signals
from .models import TaskLog

@signals.task_success.connect
def task_success_handler(sender=None, result=None, **kw):
    TaskLog.objects.create_from_success_signal(sender=sender, result=result)

@signals.task_failure.connect
def task_failure_handler(sender=None, task_id=None, exception=None, **kw):
    TaskLog.objects.create_from_failure_signal(sender, task_id, exception)

@signals.task_retry.connect
def task_retry_handler(sender=None, request=None, reason=None, **kw):
    TaskLog.objects.create_from_retry_signal(sender=sender, request=request, reason=reason)

有额外的信号来覆盖其他状态(例如task_internal_error)或收集额外的数据,例如测量任务持续时间。

相关问题