我有一个带有PostgresOperator的Airflow dag来执行SQL查询。我想切换到我的测试数据库或带有config的prod数据库(使用config运行)。但postgres_conn_id不是模板字段,因此PostgresOperator说“{{ dag_run.conf.get('CONN_ID_TEST','pg_database')}}”不是连接。我使用{“CONN_ID_TEST”运行此脚本:“pg数据库测试”}配置
我尝试使用the same code of Airflow github创建一个自定义postgresql操作符,并在类CustomPostgresOperator的顶部添加template_fields: Sequence[str] = ("postgres_conn_id",)
,但这也不起作用(同样的错误)。
我有两个conn_id env变量:
- 气流连接器ID PG数据库(生产)
- 气流连接器ID PG数据库测试(测试)
我的脚本如下所示:
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.dummy import DummyOperator
DAG_ID = "init_database"
POSTGRES_CONN_ID = "{{ dag_run.conf.get('CONN_ID_TEST', 'pg_database') }}"
with DAG(
dag_id=DAG_ID,
description="My dag",
schedule_interval="@once",
start_date=dt.datetime(2022, 1, 1),
catchup=False,
) as dag:
start = DummyOperator(task_id = 'start')
my_task = PostgresOperator( #### OR CustomPostgresOperator
task_id="select",
sql="SELECT * FROM pets LIMIT 1;",
postgres_conn_id=POSTGRES_CONN_ID,
autocommit=True
)
start >> my task
我如何处理来解决我的问题?如果不可能,我如何将我的PostgresOperator连接切换到我的dev数据库,而不重新创建其他DAG脚本?
谢谢你,利奥
1条答案
按热度按时间uklbhaso1#
子类化是修改
template_fields
的可靠方法,因为template_fields
是一个class属性,所以子类只需如下所示(假设您只是将连接ID添加到现有的template_fields
中):上面使用的是Postgres提供程序版本5.3.1,它实际上使用了Common SQL provider,因此连接参数实际上是
conn_id
。(template_fields
指的是示例属性名,而不是参数名。)例如,假设使用
{"environment": "dev"}
的运行配置触发以下DAG:查看任务日志,连接ID“dev”用于执行SQL: