python 如何在PostgresOperator的配置模板中使用conn_id运行气流dag?

fumotvh3  于 2023-01-24  发布在  Python
关注(0)|答案(1)|浏览(105)

我有一个带有PostgresOperator的Airflow dag来执行SQL查询。我想切换到我的测试数据库或带有config的prod数据库(使用config运行)。但postgres_conn_id不是模板字段,因此PostgresOperator说“{{ dag_run.conf.get('CONN_ID_TEST','pg_database')}}”不是连接。我使用{“CONN_ID_TEST”运行此脚本:“pg数据库测试”}配置
我尝试使用the same code of Airflow github创建一个自定义postgresql操作符,并在类CustomPostgresOperator的顶部添加template_fields: Sequence[str] = ("postgres_conn_id",),但这也不起作用(同样的错误)。
我有两个conn_id env变量:

  • 气流连接器ID PG数据库(生产)
  • 气流连接器ID PG数据库测试(测试)

我的脚本如下所示:

from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.dummy import DummyOperator

DAG_ID = "init_database"
POSTGRES_CONN_ID = "{{ dag_run.conf.get('CONN_ID_TEST', 'pg_database') }}"

with DAG(
    dag_id=DAG_ID, 
    description="My dag",
    schedule_interval="@once",
    start_date=dt.datetime(2022, 1, 1),
    catchup=False,
    ) as dag:

    start = DummyOperator(task_id = 'start')
  
    my_task = PostgresOperator(                     #### OR CustomPostgresOperator
        task_id="select",
        sql="SELECT * FROM pets LIMIT 1;",
        postgres_conn_id=POSTGRES_CONN_ID,
        autocommit=True
        )

start >> my task

我如何处理来解决我的问题?如果不可能,我如何将我的PostgresOperator连接切换到我的dev数据库,而不重新创建其他DAG脚本?
谢谢你,利奥

uklbhaso

uklbhaso1#

子类化是修改template_fields的可靠方法,因为template_fields是一个class属性,所以子类只需如下所示(假设您只是将连接ID添加到现有的template_fields中):

from airflow.providers.postgres.operators.postgres import PostgresOperator as _PostgresOperator

class PostgresOperator(_PostgresOperator):
    template_fields = [*_PostgresOperator.template_fields, "conn_id"]

上面使用的是Postgres提供程序版本5.3.1,它实际上使用了Common SQL provider,因此连接参数实际上是conn_id。(template_fields指的是示例属性名,而不是参数名。)
例如,假设使用{"environment": "dev"}的运行配置触发以下DAG:

from pendulum import datetime

from airflow.decorators import dag
from airflow.providers.postgres.operators.postgres import PostgresOperator as _PostgresOperator

class PostgresOperator(_PostgresOperator):
    template_fields = [*_PostgresOperator.template_fields, "conn_id"]

@dag(start_date=datetime(2023, 1, 1), schedule=None)
def template_postgres_conn():
    PostgresOperator(task_id="run_sql", sql="SELECT 1;", postgres_conn_id="{{ dag_run.conf['environment'] }}")

template_postgres_conn()

查看任务日志,连接ID“dev”用于执行SQL:

相关问题