手动触发DAG时,如何将可选参数传递给Airflow PythonOperator?

egdjgwm8  于 2023-01-16  发布在  Python
关注(0)|答案(1)|浏览(257)

我正尝试使用Trigger DAG w/ config可选参数传递给我计划手动触发的DAG。它使用PythonOperator
我遵循了answer by J.Fell,其中建议显式访问dag_run.conf传递的参数

param1="{{ dag_run.conf['param1'] }}"
param2="{{ dag_run.conf['param2'] }}"

当我触发运行带有可选参数(param2)的函数的DAG时,仅传递:{"param1":"value1"}
我得到一个错误:

UndefinedError: 'dict_object' has no attribute 'param2'

如何调整从dag_run.conf访问值,使可选参数不会抛出错误?

DAG代码示例

__version__ = "@version@"

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
import sys
import socket
import datetime
import my_function

file_name = my_function.get_file_name(__file__)

dag = DAG(
    dag_id = file_name,
    default_args=my_function.default_args,
    catchup=False,
    schedule_interval=None,
    tags = ['my_function']
)

# Parameters
param1="{{ dag_run.conf['param1'] }}"
param2="{{ dag_run.conf['param2'] }}"

custom_task = PythonOperator(
    task_id = "custom_task",
    dag=dag,
    python_callable=my_function.main,
    op_kwargs= {**{'param1': param1, 'param2': param2},
    trigger_rule=TriggerRule.ALL_SUCCESS
)

custom_task
oprakyz7

oprakyz71#

您有两个选项来实现此目的:
1.使用dag_run.conf

param2="{{ dag_run.conf['param2'] if 'param2' in dag_run.conf else 'default_value' }}"

1.使用Params

dag = DAG(
    dag_id = file_name,
    default_args=my_function.default_args,
    catchup=False,
    schedule_interval=None,
    tags = ['my_function'],
    params={
        "param1": "default_value",
        "param2": "default_value",
    }
)

# Parameters
param1="{{ params.param1 }}"
param2="{{ params.param2 }}"

但是您需要检查core.dag_run_conf_overrides_params是否设置为False(默认情况下为True

相关问题