python-3.x 等待Prefect上一个流成功执行

eyh26e7m  于 2023-08-08  发布在  Python
关注(0)|答案(1)|浏览(132)

我正在尝试使用Prefect 2来配置Windows上一些任务的执行。
我经常遇到一些问题,可能需要手动修复并重新运行它。在此之后,我希望其他任务等到成功执行第一个任务,然后才启动,而不必手动执行失败的计划。正如我在下面的代码中所做的,task_that_should_wait等待task_that_fails完成,而不管它是成功还是失败。它们由一个流为每个流调度,使用单独流的原因是因为它们被调度为名义上在不同的时间执行。

from prefect import flow, task
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
    

@task
def task_that_fails():
    raise(Exception("Failed here")) # Imagine this is a bug, that I'll fix manually
    return True

    
@flow
def flow_that_fails():
    task_that_fails()

deploy_that_fails = Deployment.build_from_flow(
    name="task_that_fails",
    flow=flow_that_fails,
    schedule=CronSchedule(cron="36 21 * * MON,TUE,WED,THU,FRI", timezone='US/Central'),
)

deploy_that_fails.apply()
    
@task
def task_that_should_wait():
    print("hey!")
    return True

    
@flow
def flow_that_waits():
    task_that_should_wait(wait_for=['task_that_fails'])

deploy_that_waits = Deployment.build_from_flow(
    name="deploy_that_waits",
    flow=flow_that_waits,
    schedule=CronSchedule(cron="35 21 * * MON,TUE,WED,THU,FRI", timezone='US/Central'),
)

deploy_that_waits.apply()

字符串
如何将task_that_should_wait配置为实际等待task_that_fails成功完成?这些都是用于耗时很长且具有多个任务依赖的进程的 prop ,因此认为task_that_waits在task_that_fails之后没有必要运行,所以我也需要避免将它们变成单个任务或流。

mfuanj7w

mfuanj7w1#

我不确定我是否完全理解了,但我认为一种方法是通过任务运行并发限制。
您可以在部署时将两个任务都标记为标记(即waiting-tag),并将其设置为1,以便在给定时间内只能执行其中一个任务,并且它们必须等待对方完成。
然后,可以通过流和/或任务以编程方式更改并发限制,以创建更复杂的策略。

相关问题