rabbitmq 有没有可能等到celery 组完成?

w3nuxt5m  于 12个月前  发布在  RabbitMQ
关注(0)|答案(2)|浏览(163)

我正在尝试做组任务,并等待,直到所有的组子任务完成,然后运行最后一个任务。但是当我调用task时,它调用group和last tasks,但是在group finish之前完成的最后一个任务。是否可以等到组内的所有任务完成?

@shared_task(name="print")
def print_order():
    print("PRINT #1")
    mylist = [(1, 2), (4, 6), (1, 4)]
    group([(add.s(*i) | order_id_print.s()) for i in mylist]).delay()

@shared_task(name="print.add")
def add(x,y):
    print("ADD #2")
    chain(add_task1.s(x, y, 'task id') | add_task2.si(x, y, "task_id")).delay()
    return x+y

@shared_task(name="add_task_1")
def add_task1(order_id, ftype, task_id):
    print("ADD task #2-1")
    print("add tasks task1 order_id {} {} {}".format(order_id, ftype, task_id))

@shared_task(name="add_task_2")
def add_task2(order_id, ftype, task_id):
    print("ADD task #2-2")
    print("add tasks task2 order_id {} {} {}".format(order_id, ftype, task_id))

@shared_task(name="print.order_id_print")
def order_id_print(id):
    print("ORDER #3")
    print("order id is {}".format(id))
fnvucqvd

fnvucqvd1#

你可能想要的是一个和弦而不是一个组合。chord是一个任务,它只在组中的所有任务都完成执行后才执行。
看看文档:
https://docs.celeryproject.org/en/latest/userguide/canvas.html#chords

6ie5vjzr

6ie5vjzr2#

为此,有两种选择:* 删除“group”和“chord”(Chord不适用于rpc)**。两者都是celery结构,允许您并行运行多个任务,然后等待所有任务完成。
使用chord,可以定义一个“回调"任务,并异步地完成整个过程。如果你的服务器在返回客户端之前需要等待任务,那么组可能是最好的选择。

集团:

tasks = [get_foods.s(x,y), get_drinks.s(z,w)]
results = group(tasks)().get() #lock until everything done
for result in results:
    print(result)

和弦:

chord([get_foods.s(x,y), get_drinks.s(x,y)], finish_order.s()).delay()

相关问题