我有一个DAG计划每天运行12次,但有时手动触发的次数超过了12次。我想让DAG知道它是否是一个预定的触发器,如果是,如果它是一天中最后一次预定的运行,那么它应该做其他事情。我如何在DAG中确定它是否是一天中最后一个计划的DAG(对于该特定DAG ID)?谢谢!
fhg3lkii1#
在Airflow中没有任何开箱即用的东西来告诉你这是否是“一天中最后一个预定的间隔”。但是,您可以自己实现这种行为。假设你的cron时间表是"0 */2 * * *"。一天的最后一个时间表将是从22:00:00到00:00:00(第二天)。我们可以使用这些信息来给予不同的行为,例如:
"0 */2 * * *"
22:00:00
00:00:00
import datetime import pendulum from airflow.decorators import dag, task from airflow.exceptions import AirflowSkipException from airflow.models import DagRun from airflow.utils.types import DagRunType @dag(schedule="0 */2 * * *", start_date=datetime.datetime(2023, 6, 20)) def so_76559726(): @task def check_last_run_of_day( data_interval_start: pendulum.DateTime = None, data_interval_end: pendulum.DateTime = None, dag_run: DagRun = None, ): print(f"The interval start is {data_interval_start}") print(f"The interval end is {data_interval_end}") print(f"The run type is {dag_run.run_type}") if (data_interval_end.date() - data_interval_start.date()).days == 1 and dag_run.run_type == DagRunType.SCHEDULED: raise AirflowSkipException("Skip the last interval of the day.") check_last_run_of_day() so_76559726()
在此DAG中,当间隔的结束日期在第二天时,条件if (data_interval_end.date() - data_interval_start.date()).days == 1为True,条件dag_run.run_type == DagRunType.SCHEDULED仅在计划的DAG运行中有效。为了使其易于识别,我在任务中引发了AirflowSkipException,但您可以在这种情况下实现任何您希望的行为:
if (data_interval_end.date() - data_interval_start.date()).days == 1
True
dag_run.run_type == DagRunType.SCHEDULED
1条答案
按热度按时间fhg3lkii1#
在Airflow中没有任何开箱即用的东西来告诉你这是否是“一天中最后一个预定的间隔”。但是,您可以自己实现这种行为。
假设你的cron时间表是
"0 */2 * * *"
。一天的最后一个时间表将是从22:00:00
到00:00:00
(第二天)。我们可以使用这些信息来给予不同的行为,例如:在此DAG中,当间隔的结束日期在第二天时,条件
if (data_interval_end.date() - data_interval_start.date()).days == 1
为True
,条件dag_run.run_type == DagRunType.SCHEDULED
仅在计划的DAG运行中有效。为了使其易于识别,我在任务中引发了AirflowSkipException,但您可以在这种情况下实现任何您希望的行为: