如何使用SQLAlchemy事件和Azure Active Directory令牌与类从pyodbc中删除“Trusted_Connection=Yes”?

r6l8ljro  于 2023-06-24  发布在  其他
关注(0)|答案(1)|浏览(129)

SQLAlchemy文档展示了如何使用event.listens_for decorator使用不是在函数或类内部创建的引擎来删除连接字符串的可信连接部分-以下是示例:

import struct
from sqlalchemy import create_engine, event
from sqlalchemy.engine.url import URL
from azure import identity

SQL_COPT_SS_ACCESS_TOKEN = 1256  # Connection option for access tokens, as defined in msodbcsql.h
TOKEN_URL = "https://database.windows.net/"  # The token URL for any Azure SQL database

connection_string = "mssql+pyodbc://@my-server.database.windows.net/myDb?driver=ODBC+Driver+17+for+SQL+Server"

engine = create_engine(connection_string)

azure_credentials = identity.DefaultAzureCredential()

@event.listens_for(engine, "do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
    # remove the "Trusted_Connection" parameter that SQLAlchemy adds
    cargs[0] = cargs[0].replace(";Trusted_Connection=Yes", "")

    # create token credential
    raw_token = azure_credentials.get_token(TOKEN_URL).token.encode("utf-16-le")
    token_struct = struct.pack(f"<I{len(raw_token)}s", len(raw_token), raw_token)

    # apply it to keyword arguments
    cparams["attrs_before"] = {SQL_COPT_SS_ACCESS_TOKEN: token_struct}

我面临的问题是,我使用一个名为Db的类连接到我的数据库,它有一个.engine属性在示例化之后创建(创建的引擎依赖于环境)。当“Trusted_Connection=Yes”是示例化后创建的类的属性时,我如何使用装饰器从引擎中删除它?
下面的代码导致错误:AttributeError: type object 'Db' has no attribute 'engine'

import os
import struct
from azure.identity import DefaultAzureCredential
from sqlalchemy.engine.url import URL
from sqlalchemy import create_engine, event
from sqlalchemy import inspect

class Db:
    def __init__(self, config: object) -> None:
        url = URL.create(
            drivername="mssql+pyodbc",
            port=1433,
            query=dict(driver='ODBC Driver 18 for SQL Server'),
            host=f"tcp:{os.environ.get(f'SERVER_{config.environment}')}",
            database=os.environ.get(f'DATABASE_{config.environment}')
        )
        self.engine = create_engine(url=url, connect_args={"autocommit": True})
        self.connection = self.engine.connect()
        self.inspector = inspect(subject=self.engine)

    def close(self) -> None:
        self.connection.close()

@event.listens_for(target=Db.engine, identifier="do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
    # remove the "Trusted_Connection" parameter that SQLAlchemy adds
    cargs[0] = cargs[0].replace(";Trusted_Connection=Yes", "")

    azure_credentials = identity.DefaultAzureCredential()
    raw_token = azure_credentials.get_token(TOKEN_URL).token.encode("utf-16-le")
    token_struct = struct.pack(f"<I{len(raw_token)}s", len(raw_token), raw_token)

    # apply it to keyword arguments
    cparams["attrs_before"] = {1256: token_struct}
q8l4jmvw

q8l4jmvw1#

进一步挖掘后,您可以使用listen方法代替event中的listens_for,并在targetidentifier之外传递回调函数。
解决方案如下:

import os
import struct
from azure.identity import DefaultAzureCredential
from sqlalchemy.engine.url import URL
from sqlalchemy import create_engine, event
from sqlalchemy import inspect

class Db():
    def __init__(self, config: object) -> None:
        url = URL.create(
            drivername="mssql+pyodbc",
            port=1433,
            query=dict(driver='ODBC Driver 18 for SQL Server'),
            host=f"tcp:{os.environ.get(f'SERVER_{config.environment}')}",
            database=os.environ.get(f'DATABASE_{config.environment}')
        )
        self.engine = create_engine(url=url, connect_args={"autocommit": True})
        event.listen(target=self.engine, identifier="do_connect", fn=self._add_token)
        self.connection = self.engine.connect()
        self.inspector = inspect(subject=self.engine)

    def _add_token(self, dialect, conn_rec, cargs, cparams):
        azure_credentials = identity.DefaultAzureCredential()
        raw_token = azure_credentials.get_token(TOKEN_URL).token.encode("utf-16-le")
        token_struct = struct.pack(f"<I{len(raw_token)}s", len(raw_token), raw_token)
        cparams["attrs_before"] = {1256: token_struct}
    
    def close(self) -> None:
        self.connection.close()

相关问题