我需要在Snowflake中的同一会话中运行2条SQL语句,所以我尝试在Airflow中使用execute_string连接器来实现这一点
我的snowflake_hook.py如下所示:
from airflow.hooks.base_hook import BaseHook
class SnowflakeHook(BaseHook):
def __init__(self, sf_conn_id, warehouse=None, source=None):
"""Snowflake hook init.
Arguments:
sf_conn_id: Airflow connection ID providing Snowflake credential.
warehouse: Warehouse override. Defaults to "warehouse" key in the connection extra section.
source: Hook source to pass to the parent class constructor.
"""
import snowflake.connector
super().__init__(source)
tmp_conn = self.get_connection(sf_conn_id)
self.user = tmp_conn.login
self.password = tmp_conn.password
extras = tmp_conn.extra_dejson
self.account = extras.get("account", None)
self.warehouse = warehouse if warehouse is not None else extras.get("warehouse", None)
self.database = extras.get("database", None)
self.schema = tmp_conn.schema
self.role=extras.get("role", None)
self.conn = snowflake.connector.connect(
user=self.user,
password=self.password,
account=self.account,
warehouse=self.warehouse,
database=self.database,
schema=self.schema,
role=self.role,
)
def execute(self, qry):
"""Execute a sql statement"""
cs = self.conn.cursor()
try:
cs.execute(qry)
return cs.fetchall()
finally:
cs.close()
在snowflake_hook中添加了此部分
def execute_string(self, context):
"""Executes one or more SQL statements separated by semi-colons"""
cs = self.conn.cursor()
try:
cs.execute_string(context)
return cs.fetchall()
finally:
cs.close()
我的airflow dag使用这个来执行sql语句,如下所示:
def delete(sf_conn_id):
from custom.hooks.snowflake_hook import SnowflakeHook
table_list_sql = """
create temporary table dim_expired
as select * from expired_insert_stream;
select distinct id
from dim_expired
"""
sf = SnowflakeHook(sf_conn_id)
print(table_list_sql)
res = sf.execute_string(table_list_sql)
此操作失败,并显示错误“UnboundLocalError:赋值前引用了局部变量'sf'”
1条答案
按热度按时间owfi6suc1#
您不使用SnowflakeOperator是否有原因?
为了便于以后参考,可以使用该操作符执行多个SQL语句,而无需编写自定义钩子:
您也可以在SQL文件中提供您的语句。已使用Airflow 2.5.0和Snowflake provider 4.0.2进行测试。