python 气流:有一天知道当下一个预定的时间

ruoxqz4g  于 2023-06-28  发布在  Python
关注(0)|答案(1)|浏览(93)

我有一个DAG计划每天运行12次,但有时手动触发的次数超过了12次。我想让DAG知道它是否是一个预定的触发器,如果是,如果它是一天中最后一次预定的运行,那么它应该做其他事情。
我如何在DAG中确定它是否是一天中最后一个计划的DAG(对于该特定DAG ID)?
谢谢!

fhg3lkii

fhg3lkii1#

在Airflow中没有任何开箱即用的东西来告诉你这是否是“一天中最后一个预定的间隔”。但是,您可以自己实现这种行为。
假设你的cron时间表是"0 */2 * * *"。一天的最后一个时间表将是从22:00:0000:00:00(第二天)。我们可以使用这些信息来给予不同的行为,例如:

import datetime

import pendulum
from airflow.decorators import dag, task
from airflow.exceptions import AirflowSkipException
from airflow.models import DagRun
from airflow.utils.types import DagRunType

@dag(schedule="0 */2 * * *", start_date=datetime.datetime(2023, 6, 20))
def so_76559726():
    @task
    def check_last_run_of_day(
        data_interval_start: pendulum.DateTime = None,
        data_interval_end: pendulum.DateTime = None,
        dag_run: DagRun = None,
    ):
        print(f"The interval start is {data_interval_start}")
        print(f"The interval end is {data_interval_end}")
        print(f"The run type is {dag_run.run_type}")

        if (data_interval_end.date() - data_interval_start.date()).days == 1 and dag_run.run_type == DagRunType.SCHEDULED:
            raise AirflowSkipException("Skip the last interval of the day.")

    check_last_run_of_day()

so_76559726()

在此DAG中,当间隔的结束日期在第二天时,条件if (data_interval_end.date() - data_interval_start.date()).days == 1True,条件dag_run.run_type == DagRunType.SCHEDULED仅在计划的DAG运行中有效。
为了使其易于识别,我在任务中引发了AirflowSkipException,但您可以在这种情况下实现任何您希望的行为:

相关问题