python 如何运行简单的气流DAG

7y4bm7vi  于 2023-01-04  发布在  Python
关注(0)|答案(3)|浏览(249)

我对Airflow是个新手。我想在指定的日期运行一个简单的DAG。我很难区分开始日期、执行日期和回填。运行DAG的命令是什么?
以下是我后来的尝试:

airflow run dag_1 task_1 2017-1-23

我第一次运行该命令时,任务执行正确,但当我再次尝试时,它不起作用。
下面是我运行的另一个命令:

airflow backfill dag_1 -s 2017-1-23 -e 2017-1-24

我不知道这个命令会产生什么结果。DAG会在每天23到24时执行吗?
在运行上面的两个命令之前,我执行了以下操作:

airflow initdb
airflow scheduler 
airflow webserver -p 8085 --debug &

这是我的DAG

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2017, 1, 23, 12),
    'email': ['airflow@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'dag_1', default_args=default_args, schedule_interval=timedelta(1))

t1 = BashOperator(
    task_id='create_clients',
    bash_command='Rscript /scripts/Cli.r',
    dag=dag)

t2 = BashOperator(
    task_id='create_operation',
    bash_command='Rscript Operation.r',
    retries=3,
    dag=dag)

t2.set_upstream(t1)

截图:Tree View

    • 更新**
airflow run dag_1 task_1 2017-1-23T10:34
kpbwa7wx

kpbwa7wx1#

如果您使用

airflow run dag_1 task_1 2017-1-23

运行已保存,再次运行不会执行任何操作。您可以尝试通过强制运行来重新运行它

airflow run --force=true dag_1 task_1 2017-1-23

气流回填命令将运行在从开始日期到结束日期的指定时间段内运行的任何执行。这将取决于您在DAG上设置的计划,如果您将其设置为每小时触发一次,则应运行24次,但它也不会重新执行以前执行的运行。
您可以清除该任务,就像它从未运行过一样

airflow clear dag_1 -s 2017-1-23 -e 2017-1-24

另请查看此处的cli文档:https://airflow.incubator.apache.org/cli.html

mrwjdhj3

mrwjdhj32#

开始日期、执行日期和回填之间的差异
执行回填以显式运行DAG以测试/手动运行DAG/重新运行出错的DAG。您可以使用CLI执行此操作

airflow backfill -s <<start_date>> <<dag>> 
#optionally provide -1 as start_date to run it immediately

顾名思义,start_date是DAG定义有效的起始日期

execution_date是运行它的日期-时间。这是在测试DAG的各个任务时提供的,如下所示

airflow test <<dag>> <<task>> <<exec_date>>

运行dag的命令是什么

回填是显式运行DAG的命令。否则,您只需将DAG放在DAGBAG文件夹中,调度程序将按照DAG定义中定义的调度运行它

airflow backfill -s <<start_date>> <<dag>> 
#optionally provide -1 as start_date to run it immediately
gorkyyrv

gorkyyrv3#

对于更新版本的Airflow,应使用airflow tasks run
例如:airflow tasks run dag_1 task_1 2023-1-3

相关问题