我想根据列表创建任务。例如:task_1 >> task_2 >> task_3
基于列表[1,2,3]。其思想是每个任务都应该触发一个外部DAG。DAG_A应触发DAG_B启动,一旦DAG_B中的所有任务完成,则DAG_A中的下一个任务应启动。它应该等待DAG_B中的最后一个任务成功,然后再触发DAG_A中的下一个任务。
要创建任务,以下是我当前的解决方案。
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
dag = DAG(
"MY_DAG",
start_date=datetime(2023, 1, 1),
schedule="@daily",
catchup=False
)
def ex_func_airflow(i):
print(i)
tabs = [1, 2, 3]
for i in tabs:
task_id = f'task_tab_{i}'
task = PythonOperator(
task_id=task_id,
op_args=[i],
python_callable=ex_func_airflow,
dag=dag
)
task
更新:
我尝试了以下方法,但当wait_task
启动时,它会保持运行,并且不会触发DAG_A中的task_2
。DAG_B是TEST_DAG,它具有必须在DAG_A中的task_2
启动之前完成的任务。但是wait_task
从不触发DAG_A中的task_2运行。
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
dag = DAG(
"MY_DAG",
start_date=datetime(2023, 1, 1),
schedule="@daily",
catchup=False
)
trigger_operator = TriggerDagRunOperator(
task_id='trigger_operator',
trigger_dag_id='TEST_DAG',
)
def ex_func_airflow(i):
print(i)
tabs = [1, 2, 3]
previous_task = None
for i in tabs:
task_id = f'task_tab_{i}'
task = PythonOperator(
task_id=task_id,
op_args=[i],
python_callable=ex_func_airflow,
dag=dag
)
if previous_task:
wait_task = ExternalTaskSensor(
task_id=f'wait_task_{i}',
external_dag_id="EXTERNAL_DAG_ID",
external_task_id=f'external_task_{i}',
dag=dag
)
trigger_operator >> previous_task >> wait_task >> task
else:
task
previous_task = task
1条答案
按热度按时间qxsslcnc1#
要确保每个任务都触发一个外部BAG,并在移动到下一个任务之前等待其完成,可以使用ExternalTaskSensor操作符。此操作员等待另一天的特定外部任务完成后再继续。
你是这个意思还是别的?