我已经创建了一个celery 链,它由一组任务和普通任务组成,如下所示
@app.task
def task_initial(id):
# do something
print(id)
@app.task
def task_to_group(id):
# do something
# raise exception
try:
print(id)
except Exception:
raise
@app.task
def task_final(id):
# do something
# update the status (end process)
if id == 1:
id = 2
print(id)
@app.task
def task_error(id):
# do something
# handle the failure. update status in db to failed.
if id == 1:
id = 2
print(id)
已将画布创建为:
id = 1 # this some primary key id from db.
chunk = [
task_to_group.si(id).on_error(task_error.s(id))
for id in range(0,5)
]
c = chain(
task_initial.si(id)
group(chunk),
task_final.si(id)
).on_error(task_error.s(id))
c.apply_async()
所以,基本上我面临的问题是,如果task_to_group
中的一个任务失败/引发异常,则该任务上的on_error
被调用,但在链的最后,下一个要执行的任务应该是task_final
或on_error
或chain
,则应该调用task_error
。但他们都没有被召唤。我还验证了这些任务都没有在队列中等待执行。
我想要的是,如果组中的任何任务失败,该组的task_error
应该被调用,并继续运行组中的其余任务。分组完成后,应该调用task_error
上的下一个任务task_final
。
1条答案
按热度按时间66bbxpm51#
在当前实现中,为组中的任务指定的on_error回调不会被触发,因为任务组中引发的异常不会自动传播到组本身。
试试这个密码-