python 使用execute_string在气流中执行多个SQL语句

fbcarpbf  于 2023-01-29  发布在  Python
关注(0)|答案(1)|浏览(134)

我需要在Snowflake中的同一会话中运行2条SQL语句,所以我尝试在Airflow中使用execute_string连接器来实现这一点
我的snowflake_hook.py如下所示:

from airflow.hooks.base_hook import BaseHook
class SnowflakeHook(BaseHook):
    

    def __init__(self, sf_conn_id, warehouse=None, source=None):
        """Snowflake hook init.

        Arguments:
        sf_conn_id: Airflow connection ID providing Snowflake credential.
        warehouse: Warehouse override.  Defaults to "warehouse" key in the connection extra section.
        source: Hook source to pass to the parent class constructor.
        """
        import snowflake.connector

        super().__init__(source)
        tmp_conn = self.get_connection(sf_conn_id)
        self.user = tmp_conn.login
        self.password = tmp_conn.password
        extras = tmp_conn.extra_dejson
        self.account = extras.get("account", None)
        self.warehouse = warehouse if warehouse is not None else extras.get("warehouse", None)
        self.database = extras.get("database", None)
        self.schema = tmp_conn.schema
        self.role=extras.get("role", None)
        self.conn = snowflake.connector.connect(
            user=self.user,
            password=self.password,
            account=self.account,
            warehouse=self.warehouse,
            database=self.database,
            schema=self.schema,
            role=self.role,
        )
    
    def execute(self, qry):
        """Execute a sql statement"""
        cs = self.conn.cursor()
        try:
            cs.execute(qry)
            return cs.fetchall()
        finally:
            cs.close()

在snowflake_hook中添加了此部分

def execute_string(self, context):
        """Executes one or more SQL statements separated by semi-colons"""
        cs = self.conn.cursor()
        try:
            cs.execute_string(context)
            return cs.fetchall()
        finally:
            cs.close()

我的airflow dag使用这个来执行sql语句,如下所示:

def delete(sf_conn_id):

        from custom.hooks.snowflake_hook import SnowflakeHook
        table_list_sql = """
                        create temporary table dim_expired  
                            as select * from expired_insert_stream; 
                        select distinct id  
                          from dim_expired
                        """
                        
        sf = SnowflakeHook(sf_conn_id)
        print(table_list_sql)
        res = sf.execute_string(table_list_sql)

此操作失败,并显示错误“UnboundLocalError:赋值前引用了局部变量'sf'”

owfi6suc

owfi6suc1#

您不使用SnowflakeOperator是否有原因?
为了便于以后参考,可以使用该操作符执行多个SQL语句,而无需编写自定义钩子:

from airflow.decorators import dag
from pendulum import datetime
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator

@dag(
    dag_id="snowflake_basic",
    start_date=datetime(2022,12,10),
    schedule="@daily",
    catchup=False,
)
def snowflake_basic():

    t1 = SnowflakeOperator(
        task_id="t1",
        snowflake_conn_id="snowflake_conn",
        sql="""create temporary table temp_table_4  
            as select * from table_2; 
            select DISTINCT(customer_id)  
            from temp_table_4;"""
    )

snowflake_basic()

您也可以在SQL文件中提供您的语句。已使用Airflow 2.5.0和Snowflake provider 4.0.2进行测试。

相关问题