我正在airflow(gc composer)中设置我的dag,并尝试在我的应用程序中使用jinja2模板 DataProcHiveOperator
. 但我得到了一个 jinja2.exceptions.UndefinedError: 'jinja' is undefined error
. 然而,在我的代码中,没有一个地方我称之为“jinja”。我错过了什么?
对于其他类似的问题,我已经尝试过很多建议,但都没有奏效(可能是因为我遗漏了一些东西)。我试图在我的dag文件中创建一个名为“jinja”的对象,但也没有成功。
这是我的狗
from airflow.models import DAG, Variable
from airflow.contrib.operators.dataproc_operator import DataProcHiveOperator, DataprocClusterScaleOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime, timedelta, time
execution_date = "{{ ds }}"
args = {
'owner': 'Raul Gregg',
'start_date': datetime(2018,12,12),
'provide_context': True,
'depends_on_past': False,
'retries': 0,
'retry_delay': timedelta(seconds=30),
'project_id': Variable.get('gcp_project'),
'cluster_name': 'hive-cluster',
'region': 'europe-west3'
}
dag = DAG('dag_testing',
default_args=args,
max_active_runs=9,
schedule_interval=timedelta(days=1)
)
test_dim_facets = DataProcHiveOperator(
task_id='test_dim_facets',
query='/hql/tests/tests_dim_facets_1.q',
dag=dag
)
这里是.q文件,由于上面的错误,dag甚至没有调用它;
SELECT
distinct original_color_flag,
{{ execution_date }} as exec_date
from omni_offer.dim_color_flagged_models;
我只想用jinja2日期模板运行一个查询。很简单,但很难实现。谢谢!
1条答案
按热度按时间rsaldnfx1#
我设法用自己的代码找到了问题所在。为了使dataprochiveoperator正常工作,有必要创建一个名为“jinja”的对象,其中包含要使用的关键字字典。以下是我添加到.py代码中的片段:
现在,如果调用.q文件中的那些项,它将相应地读取并替换文本。