postgresql 气流和模板参考和PostgresHook

vuktfyat  于 2023-01-30  发布在  PostgreSQL
关注(0)|答案(2)|浏览(112)

我有一个问题,我想使用模板参考- {{ds}}当在PostgresOperator中替换时,一切都很好(我想是这样),但PostgresHook不想工作

def prc_mymys_update(procedure: str, type_agg: str):
    with PostgresHook(postgres_conn_id=CONNECTION_ID_GP).get_conn() as conn:
        with conn.cursor() as cur:
            with open(URL_YML_2,"r", encoding="utf-8") as f:
                ya_2 = yaml.safe_load(f)
                yml_mymts_2 = ya_2['type_agg']
                query_pg = ""
                if yml_mymts_2[0]['type_agg_name'] == "day" and type_agg == "day":
                    sql_1 = yml_mymts_2[0]['sql']
                    query_pg = f"""{sql_1}"""
                elif yml_mymts_2[1]['type_agg_name'] == "retention" and type_agg == "retention":
                    sql_2 = yml_mymts_2[1]['sql']
                    query_pg = f"""{sql_2}"""
                elif yml_mymts_2[2]['type_agg_name'] == "mau" and type_agg == "mau":
                    sql_3 = yml_mymts_2[2]['sql']
                    query_pg = f"""{sql_3}"""
                cur.execute(query_pg)
                dates_list = cur.fetchall()
                for date_res in dates_list:
                    cur.execute(
                        "select from {}(%(date)s::date);".format(procedure),
                        {"date": date_res[0].strftime("%Y-%m-%d")},
                    )
    conn.close()

我使用yml

type_agg:
  - type_agg_name: day
    sql: select calendar_date from entertainment_dds.v_calendar where calendar_date between '{{ds}}'::date - interval '7 days' and '{{ds}}'::date - 1 order by 1 desc
  - type_agg_name: retention
    sql: SELECT t.date::date AS date FROM generate_series((date_trunc('month','{{execution_date.strftime('%Y-%m-%d')}}'::date) - interval '11 month'), date_trunc('month','{{execution_date.strftime('%Y-%m-%d')}}'::date) , '1 month'::interval) t(date) order by 1 asc
  - type_agg_name: mau
    sql: select dt::date date_ from generate_series('{{execution_date.strftime('%Y-%m-%d')}}'::date - interval '7 days', '{{execution_date.strftime('%Y-%m-%d')}}'::date - interval '1 days', interval '1 days') dt order by 1 asc

当我运行dag时,它会执行某个任务,该任务使用

- type_agg_name: retention
    sql: SELECT t.date::date AS date FROM generate_series((date_trunc('month','{{execution_date.strftime('%Y-%m-%d')}}'::date) - interval '11 month'), date_trunc('month','{{execution_date.strftime('%Y-%m-%d')}}'::date) , '1 month'::interval) t(date) order by 1 asc

我错了
psycopg2.errors.UndefinedColumn:列“y”不存在第1行:...((日期截断('月份','{{执行日期.字符串时间(' %Y-%m-% d ')}...
enter image description here
我试图查找有关Templates引用和PostgresHook交互的信息,但一无所获
https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#templates-reference

zzzyeukh

zzzyeukh1#

这是预期的。templated_fields是Airflow中BaseOperator的一个属性,所有操作符都从该属性继承。这就是在使用PostgresOperator时传入Jinja表达式的工作方式。
如果你需要编写一个自定义任务,你需要显式地呈现模板值,就像这样,未经测试,但我确信这可以在你的函数中推断出来:

def prc_mymys_update(procedure: str, type_agg: str, ti):
    ti.render_templates()

    with PostgresHook(postgres_conn_id=CONNECTION_ID_GP).get_conn() as conn:
        with conn.cursor() as cur:
            ...

ti kwargs表示Airflow任务示例,可作为执行上下文的一部分直接访问,该执行上下文推送到Airflow中的每个任务。该对象具有render_templates()方法,该方法将Jinja表达式转换为值。
如果PostgresOperator不符合您的需要,您可以将操作符子类化,并相应地对其进行调整。
此外,sql字符串本身有单引号,这会导致字符串解析问题,如您所见:'{{execution_date.strftime('%Y-%m-%d')}}'应类似于:'{{execution_date.strftime("%Y-%m-%d")}}'

fslejnso

fslejnso2#

请注意以下查询中的单引号:

sql: SELECT t.date::date AS date FROM generate_series((date_trunc('month','{{execution_date.strftime('%Y-%m-%d')}}'::date) - interval '11 month'), date_trunc('month','{{execution_date.strftime('%Y-%m-%d')}}'::date) , '1 month'::interval) t(date) order by 1 asc

具体而言,本部分:

'{{execution_date.strftime('%Y-%m-%d')}}'

这里有两个独立的字符串,以日期格式分隔,第一个字符串如下:

'{{execution_date.strftime('

这将导致日期格式单独呈现。如果将日期格式括在双引号中而不是单引号中,则应该可以解决此错误。例如:

sql: SELECT t.date::date AS date FROM generate_series((date_trunc('month','{{execution_date.strftime("%Y-%m-%d")}}'::date) - interval '11 month'), date_trunc('month','{{execution_date.strftime('%Y-%m-%d')}}'::date) , '1 month'::interval) t(date) order by 1 asc

请注意,如果RDBMS中的双引号用于其他目的,则可能需要交换双引号和单引号,例如:

"{{execution_date.strftime('%Y-%m-%d')}}"

相关问题