python-3.x 如何基于列表在Airflow中创建任务工作流?

pinkon5k  于 2023-06-07  发布在  Python
关注(0)|答案(1)|浏览(158)

我想根据列表创建任务。例如:
task_1 >> task_2 >> task_3基于列表[1,2,3]。其思想是每个任务都应该触发一个外部DAG。DAG_A应触发DAG_B启动,一旦DAG_B中的所有任务完成,则DAG_A中的下一个任务应启动。它应该等待DAG_B中的最后一个任务成功,然后再触发DAG_A中的下一个任务。
要创建任务,以下是我当前的解决方案。

from airflow import DAG
from airflow.operators.python import PythonOperator

from datetime import datetime

dag = DAG(
    "MY_DAG",
    start_date=datetime(2023, 1, 1),
    schedule="@daily",
    catchup=False
)

def ex_func_airflow(i):
    print(i)

tabs = [1, 2, 3]

for i in tabs:
    task_id = f'task_tab_{i}'
    task = PythonOperator(
        task_id=task_id,
        op_args=[i],
        python_callable=ex_func_airflow,
        dag=dag
    )

task

更新:

我尝试了以下方法,但当wait_task启动时,它会保持运行,并且不会触发DAG_A中的task_2。DAG_B是TEST_DAG,它具有必须在DAG_A中的task_2启动之前完成的任务。但是wait_task从不触发DAG_A中的task_2运行。

from airflow import DAG
from airflow.operators.python import PythonOperator

from datetime import datetime

dag = DAG(
    "MY_DAG",
    start_date=datetime(2023, 1, 1),
    schedule="@daily",
    catchup=False
)

trigger_operator = TriggerDagRunOperator(
    task_id='trigger_operator',
    trigger_dag_id='TEST_DAG',
)

def ex_func_airflow(i):
    print(i)

tabs = [1, 2, 3]

previous_task = None

for i in tabs:
    task_id = f'task_tab_{i}'
    task = PythonOperator(
        task_id=task_id,
        op_args=[i],
        python_callable=ex_func_airflow,
        dag=dag
    )

    if previous_task:
        wait_task = ExternalTaskSensor(
            task_id=f'wait_task_{i}',
            external_dag_id="EXTERNAL_DAG_ID",
            external_task_id=f'external_task_{i}',  
            dag=dag
        )
        trigger_operator >> previous_task >> wait_task >> task
    else:
        task
    previous_task = task
qxsslcnc

qxsslcnc1#

要确保每个任务都触发一个外部BAG,并在移动到下一个任务之前等待其完成,可以使用ExternalTaskSensor操作符。此操作员等待另一天的特定外部任务完成后再继续。

dag = DAG(
    "MY_DAG",
    start_date=datetime(2023, 1, 1),
    schedule_interval="@daily",
    catchup=False
)

def ex_func_airflow(i):
    print(i)

tabs = [1, 2, 3]

previous_task = None

for i in tabs:
    task_id = f'task_tab_{i}'
    task = PythonOperator(
        task_id=task_id,
        op_args=[i],
        python_callable=ex_func_airflow,
        dag=dag
    )
    
    if previous_task:
        wait_task = ExternalTaskSensor(
            task_id=f'wait_task_{i}',
            external_dag_id="EXTERNAL_DAG_ID",
            external_task_id=f'external_task_{i}',  
            dag=dag
        )
        previous_task >> wait_task >> task
    else:
        task
    previous_task = task

你是这个意思还是别的?

dag_a = DAG(
    "DAG_A",
    start_date=datetime(2023, 1, 1),
    schedule_interval="@daily",
    catchup=False
)

dag_b = DAG(
    "DAG_B",
    start_date=datetime(2023, 1, 1),
    schedule_interval="@daily",
    catchup=False
)

def ex_func_airflow(i):
    print(i)

tabs = [1, 2, 3]

previous_task = None

for i in tabs:
    task_id = f'task_tab_{i}'
    
    task_a = PythonOperator(
        task_id=task_id,
        op_args=[i],
        python_callable=ex_func_airflow,
        dag=dag_a
    )
    
    if previous_task:
        trigger_dag_b = TriggerDagRunOperator(
            task_id=f'trigger_dag_b_{i}',
            trigger_dag_id="DAG_B",
            dag=dag_a
        )
        
        wait_dag_b = ExternalTaskSensor(
            task_id=f'wait_dag_b_{i}',
            external_dag_id="DAG_B",
            external_task_id=None,  
            mode='reschedule',  
            timeout=3600,  
            poke_interval=60,
            dag=dag_a
        )
        
        previous_task >> trigger_dag_b >> wait_dag_b >> task_a
    else:
        task_a
    
    previous_task = task_a

相关问题