我对python和airflow都是新手。我正在使用气流的orcale连接。
从afflow.providers.oracle.hooks.oracle导入cx_oracle从afflow.hooks.base_hook导入basehook
以下代码已起作用:
last_snap_date = '2021-03-23'
value = hook.get_records (sql="select distinct ven as name from Execution Metrics where date > TO_DATE(\'"+last_snap_date2+"\',\'YYYY-MM-DD\')")
但是使用绑定变量的需求是最高的。这里,last_snap_date2是我希望使用sql绑定变量传递给方法“get_records”的变量。我尝试了多个步骤,但都失败了。
def _process():
print('***********Oracle Conection***************')
#Oracle Conection
hook = OracleHook(oracle_conn_id="Oracle")
last_snap_date = '2021-03-23'
value = hook.get_records (sql="select distinct ven as name from Execution Metrics where date > TO_DATE(\'"+last_snap_date2+"\',\'YYYY-MM-DD\')")
with DAG(
dag_id='OracleQuery',
start_date=dates.days_ago(2),
schedule_interval='@monthly',
catchup=False,
) as dag :
task_a =PythonOperator(
task_id="QueryIntegration",
python_callable=_process
)
获取记录方法的文件,以供参考:
def get_records(self, sql, parameters=None):
"""
Executes the sql and returns a set of records.
:param sql: the sql statement to be executed (str) or a list of
sql statements to execute
:type sql: str or list
:param parameters: The parameters to render the SQL query with.
:type parameters: dict or iterable
"""
with closing(self.get_conn()) as conn:
with closing(conn.cursor()) as cur:
if parameters is not None:
cur.execute(sql, parameters)
else:
cur.execute(sql)
return cur.fetchall()
感谢您的帮助。唐伯克斯
暂无答案!
目前还没有任何答案,快来回答吧!