python-3.x 动态生成气流障碍的功能,而不是创建气流障碍

xxhby3vn  于 2022-11-26  发布在  Python
关注(0)|答案(1)|浏览(112)

我有一个函数,它可以动态地从一个数据库中生成dag,这个数据库有dag的配置(我知道,这样做是很昂贵的)。问题是,它只在我在定义它的同一个文件中调用这个函数时生成dag,如果我导入到另一个文件中并执行它,它不会生成我的dag。
例如:

def generate_dags_dinamically():
    dags = get_dag_configs()
    # this 'dags' variable, contains some configs for generating the dags
    for dag in dags:

        # Defines dag and adds to globals                
        with DAG(
            dag_id=dag.dag_id,
            tags=dag.configs['tags'],
            start_date=dag.start,
            schedule_interval=dag.schedule,
            default_args={'owner': dag.configs['owner']},
            catchup=False
        ) as cur_dag:

            globals()[dag.dag_id] =  cur_dag

            task_start = EmptyOperator(
                task_id='task_start',
                dag=cur_dag
            )
            task_end = EmptyOperator(
                task_id='task_end',
                dag=cur_dag
            )

            python_task = PythonOperator(
                task_id=dag.task_id,
                python_callable=dag.callable,
                op_kwargs=dag.kwargs
                retries=dag.task_retries
            )

            task_start >> python_task >> task_end

# When I call here, at the same file, airflow creates the dags.
generate_dags_dinamically()

但是如果我导入另一个文件,并调用该函数,它不会创建dags。

from dags.dynamic_dags import generate_dags_dinamically

# This wont create my dags!
generate_dags_dinamically()

所以我不知道该怎么解决这个问题。也许这和全局范围有关?
(我有一些原因不调用同一个文件,如文件夹结构模式,可重用性等)

mzsu5hc0

mzsu5hc01#

Airflow只会“看到”全局名称空间中的dag对象。为了纠正您的代码,您的generate_dags_dinamically()函数应该返回一个dag对象列表,然后您应该将它们添加到全局范围中,如下所示:

from dags.dynamic_dags import generate_dags_dinamically

dags_list = generate_dags_dinamically()
for dag in dags_list:
    globals()[dag.dag_id] = dag

请参阅文档中类似的代码。

相关问题