我正尝试使用Trigger DAG w/ config
将可选参数传递给我计划手动触发的DAG。它使用PythonOperator
。
我遵循了answer by J.Fell,其中建议显式访问dag_run.conf
传递的参数
param1="{{ dag_run.conf['param1'] }}"
param2="{{ dag_run.conf['param2'] }}"
当我触发运行带有可选参数(param2
)的函数的DAG时,仅传递:{"param1":"value1"}
我得到一个错误:
UndefinedError: 'dict_object' has no attribute 'param2'
如何调整从dag_run.conf
访问值,使可选参数不会抛出错误?
DAG代码示例
__version__ = "@version@"
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
import sys
import socket
import datetime
import my_function
file_name = my_function.get_file_name(__file__)
dag = DAG(
dag_id = file_name,
default_args=my_function.default_args,
catchup=False,
schedule_interval=None,
tags = ['my_function']
)
# Parameters
param1="{{ dag_run.conf['param1'] }}"
param2="{{ dag_run.conf['param2'] }}"
custom_task = PythonOperator(
task_id = "custom_task",
dag=dag,
python_callable=my_function.main,
op_kwargs= {**{'param1': param1, 'param2': param2},
trigger_rule=TriggerRule.ALL_SUCCESS
)
custom_task
1条答案
按热度按时间oprakyz71#
您有两个选项来实现此目的:
1.使用
dag_run.conf
:1.使用
Params
:但是您需要检查core.dag_run_conf_overrides_params是否设置为
False
(默认情况下为True
)