我有一个问题,我想使用模板参考- {{ds}}当在PostgresOperator中替换时,一切都很好(我想是这样),但PostgresHook不想工作
def prc_mymys_update(procedure: str, type_agg: str):
with PostgresHook(postgres_conn_id=CONNECTION_ID_GP).get_conn() as conn:
with conn.cursor() as cur:
with open(URL_YML_2,"r", encoding="utf-8") as f:
ya_2 = yaml.safe_load(f)
yml_mymts_2 = ya_2['type_agg']
query_pg = ""
if yml_mymts_2[0]['type_agg_name'] == "day" and type_agg == "day":
sql_1 = yml_mymts_2[0]['sql']
query_pg = f"""{sql_1}"""
elif yml_mymts_2[1]['type_agg_name'] == "retention" and type_agg == "retention":
sql_2 = yml_mymts_2[1]['sql']
query_pg = f"""{sql_2}"""
elif yml_mymts_2[2]['type_agg_name'] == "mau" and type_agg == "mau":
sql_3 = yml_mymts_2[2]['sql']
query_pg = f"""{sql_3}"""
cur.execute(query_pg)
dates_list = cur.fetchall()
for date_res in dates_list:
cur.execute(
"select from {}(%(date)s::date);".format(procedure),
{"date": date_res[0].strftime("%Y-%m-%d")},
)
conn.close()
我使用yml
type_agg:
- type_agg_name: day
sql: select calendar_date from entertainment_dds.v_calendar where calendar_date between '{{ds}}'::date - interval '7 days' and '{{ds}}'::date - 1 order by 1 desc
- type_agg_name: retention
sql: SELECT t.date::date AS date FROM generate_series((date_trunc('month','{{execution_date.strftime('%Y-%m-%d')}}'::date) - interval '11 month'), date_trunc('month','{{execution_date.strftime('%Y-%m-%d')}}'::date) , '1 month'::interval) t(date) order by 1 asc
- type_agg_name: mau
sql: select dt::date date_ from generate_series('{{execution_date.strftime('%Y-%m-%d')}}'::date - interval '7 days', '{{execution_date.strftime('%Y-%m-%d')}}'::date - interval '1 days', interval '1 days') dt order by 1 asc
当我运行dag时,它会执行某个任务,该任务使用
- type_agg_name: retention
sql: SELECT t.date::date AS date FROM generate_series((date_trunc('month','{{execution_date.strftime('%Y-%m-%d')}}'::date) - interval '11 month'), date_trunc('month','{{execution_date.strftime('%Y-%m-%d')}}'::date) , '1 month'::interval) t(date) order by 1 asc
我错了
psycopg2.errors.UndefinedColumn:列“y”不存在第1行:...((日期截断('月份','{{执行日期.字符串时间(' %Y-%m-% d ')}...
enter image description here
我试图查找有关Templates引用和PostgresHook交互的信息,但一无所获
https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#templates-reference
2条答案
按热度按时间zzzyeukh1#
这是预期的。
templated_fields
是Airflow中BaseOperator
的一个属性,所有操作符都从该属性继承。这就是在使用PostgresOperator
时传入Jinja表达式的工作方式。如果你需要编写一个自定义任务,你需要显式地呈现模板值,就像这样,未经测试,但我确信这可以在你的函数中推断出来:
ti
kwargs表示Airflow任务示例,可作为执行上下文的一部分直接访问,该执行上下文推送到Airflow中的每个任务。该对象具有render_templates()
方法,该方法将Jinja表达式转换为值。如果
PostgresOperator
不符合您的需要,您可以将操作符子类化,并相应地对其进行调整。此外,
sql
字符串本身有单引号,这会导致字符串解析问题,如您所见:'{{execution_date.strftime('%Y-%m-%d')}}'
应类似于:'{{execution_date.strftime("%Y-%m-%d")}}'
fslejnso2#
请注意以下查询中的单引号:
具体而言,本部分:
这里有两个独立的字符串,以日期格式分隔,第一个字符串如下:
这将导致日期格式单独呈现。如果将日期格式括在双引号中而不是单引号中,则应该可以解决此错误。例如:
请注意,如果RDBMS中的双引号用于其他目的,则可能需要交换双引号和单引号,例如: