我有一个函数,它可以动态地从一个数据库中生成dag,这个数据库有dag的配置(我知道,这样做是很昂贵的)。问题是,它只在我在定义它的同一个文件中调用这个函数时生成dag,如果我导入到另一个文件中并执行它,它不会生成我的dag。
例如:
def generate_dags_dinamically():
dags = get_dag_configs()
# this 'dags' variable, contains some configs for generating the dags
for dag in dags:
# Defines dag and adds to globals
with DAG(
dag_id=dag.dag_id,
tags=dag.configs['tags'],
start_date=dag.start,
schedule_interval=dag.schedule,
default_args={'owner': dag.configs['owner']},
catchup=False
) as cur_dag:
globals()[dag.dag_id] = cur_dag
task_start = EmptyOperator(
task_id='task_start',
dag=cur_dag
)
task_end = EmptyOperator(
task_id='task_end',
dag=cur_dag
)
python_task = PythonOperator(
task_id=dag.task_id,
python_callable=dag.callable,
op_kwargs=dag.kwargs
retries=dag.task_retries
)
task_start >> python_task >> task_end
# When I call here, at the same file, airflow creates the dags.
generate_dags_dinamically()
但是如果我导入另一个文件,并调用该函数,它不会创建dags。
from dags.dynamic_dags import generate_dags_dinamically
# This wont create my dags!
generate_dags_dinamically()
所以我不知道该怎么解决这个问题。也许这和全局范围有关?
(我有一些原因不调用同一个文件,如文件夹结构模式,可重用性等)
1条答案
按热度按时间mzsu5hc01#
Airflow只会“看到”全局名称空间中的dag对象。为了纠正您的代码,您的
generate_dags_dinamically()
函数应该返回一个dag对象列表,然后您应该将它们添加到全局范围中,如下所示:请参阅文档中类似的代码。