我正在尝试使用Prefect 2来配置Windows上一些任务的执行。
我经常遇到一些问题,可能需要手动修复并重新运行它。在此之后,我希望其他任务等到成功执行第一个任务,然后才启动,而不必手动执行失败的计划。正如我在下面的代码中所做的,task_that_should_wait
等待task_that_fails
完成,而不管它是成功还是失败。它们由一个流为每个流调度,使用单独流的原因是因为它们被调度为名义上在不同的时间执行。
from prefect import flow, task
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
@task
def task_that_fails():
raise(Exception("Failed here")) # Imagine this is a bug, that I'll fix manually
return True
@flow
def flow_that_fails():
task_that_fails()
deploy_that_fails = Deployment.build_from_flow(
name="task_that_fails",
flow=flow_that_fails,
schedule=CronSchedule(cron="36 21 * * MON,TUE,WED,THU,FRI", timezone='US/Central'),
)
deploy_that_fails.apply()
@task
def task_that_should_wait():
print("hey!")
return True
@flow
def flow_that_waits():
task_that_should_wait(wait_for=['task_that_fails'])
deploy_that_waits = Deployment.build_from_flow(
name="deploy_that_waits",
flow=flow_that_waits,
schedule=CronSchedule(cron="35 21 * * MON,TUE,WED,THU,FRI", timezone='US/Central'),
)
deploy_that_waits.apply()
字符串
如何将task_that_should_wait
配置为实际等待task_that_fails
成功完成?这些都是用于耗时很长且具有多个任务依赖的进程的 prop ,因此认为task_that_waits在task_that_fails之后没有必要运行,所以我也需要避免将它们变成单个任务或流。
1条答案
按热度按时间mfuanj7w1#
我不确定我是否完全理解了,但我认为一种方法是通过任务运行并发限制。
您可以在部署时将两个任务都标记为标记(即
waiting-tag
),并将其设置为1,以便在给定时间内只能执行其中一个任务,并且它们必须等待对方完成。然后,可以通过流和/或任务以编程方式更改并发限制,以创建更复杂的策略。