django 如果组失败,则组和任务的Celery链不会调用下一个任务

np8igboo  于 2023-06-07  发布在  Go
关注(0)|答案(1)|浏览(165)

我已经创建了一个celery 链,它由一组任务和普通任务组成,如下所示

@app.task
def task_initial(id):
   # do something
   print(id)
@app.task
def task_to_group(id):
   # do something
   # raise exception
   try:   
      print(id)
   except Exception:
      raise
@app.task
def task_final(id):
   # do something
   # update the status (end process)
   if id == 1:
      id = 2
   print(id)
@app.task
def task_error(id):
   # do something
   # handle the failure. update status in db to failed.
   if id == 1:
      id = 2
   print(id)

已将画布创建为:

id = 1  # this some primary key id from db.
chunk = [
          task_to_group.si(id).on_error(task_error.s(id)) 
          for id in range(0,5)
        ]
c = chain(
      task_initial.si(id)
      group(chunk),
      task_final.si(id)
    ).on_error(task_error.s(id))
c.apply_async()

所以,基本上我面临的问题是,如果task_to_group中的一个任务失败/引发异常,则该任务上的on_error被调用,但在链的最后,下一个要执行的任务应该是task_finalon_errorchain,则应该调用task_error。但他们都没有被召唤。我还验证了这些任务都没有在队列中等待执行。
我想要的是,如果组中的任何任务失败,该组的task_error应该被调用,并继续运行组中的其余任务。分组完成后,应该调用task_error上的下一个任务task_final

66bbxpm5

66bbxpm51#

在当前实现中,为组中的任务指定的on_error回调不会被触发,因为任务组中引发的异常不会自动传播到组本身。
试试这个密码-

from celery import chain, group, chord

@app.task
def task_initial(id):
   # do something
   print(id)

@app.task
def task_to_group(id):
   # do something
   # raise exception
   try:
      print(id)
   except Exception:
      raise

@app.task
def task_final(id):
   # do something
   # update the status (end process)
   if id == 1:
      id = 2
   print(id)

@app.task
def task_error(id):
   # do something
   # handle the failure. update status in db to failed.
   if id == 1:
      id = 2
   print(id)

def create_group(id):
   chunk = [
      task_to_group.si(i).on_error(task_error.s(i))
      for i in range(0, 5)
   ]
   return group(chunk)

id = 1  # some primary key id from the database

c = chain(
   task_initial.si(id),
   chord(create_group.si(id), body=task_final.s(id)).on_error(task_error.s(id))
)
c.apply_async()

相关问题