pandas SQLAlchemy在查询[INFORMATION_SCHEMA]时插入期间挂起,[TABLES]

bprjcwpo  于 12个月前  发布在  其他
关注(0)|答案(4)|浏览(113)

我有一个Python进程,它使用SQLAlchemy将一些数据插入到MS SQL Server DB中。当Python进程运行时,它在插入过程中挂起。我打开SQLAlchemy日志记录以获取更多信息。我发现它挂起在SQLAlchemy似乎正在请求有关整个DB的表模式信息的位置:

2020-10-30 08:12:07 [11444:6368] sqlalchemy.engine.base.Engine._execute_context(base.py:1235) INFO: SELECT [INFORMATION_SCHEMA].[TABLES].[TABLE_NAME] 
FROM [INFORMATION_SCHEMA].[TABLES] 
WHERE [INFORMATION_SCHEMA].[TABLES].[TABLE_SCHEMA] = CAST(? AS NVARCHAR(max)) AND [INFORMATION_SCHEMA].[TABLES].[TABLE_TYPE] = CAST(? AS NVARCHAR(max)) ORDER BY [INFORMATION_SCHEMA].[TABLES].[TABLE_NAME]
2020-10-30 08:12:07 [11444:6368] sqlalchemy.engine.base.Engine._execute_context(base.py:1240) INFO: ('dbo', 'BASE TABLE')

字符串
此时,数据库中还有其他“东西”,包括一些打开的事务,我猜无论出于什么原因,查询[INFORMATION_SCHEMA].[TABLES]都会以某种方式创建一些死锁或阻塞。
我还读到(here[INFORMATION_SCHEMA].[TABLES]是一个不会导致死锁的视图,这与我对导致这个问题的原因的猜测相矛盾。
我的问题是:我可以改变SQLAlchemy的配置/设置,使它首先不进行此查询吗?

UPDATE 1:插入的Python代码如下:

with sqlalchemy.create_engine("mssql+pyodbc:///?odbc_connect=%s" % params).connect() as connection:
    # df is a Pandas DataFrame
    df.to_sql(name=my_table, con=connection, if_exists='append', index=False)


请注意,当我在一天中没有其他数据库事务进行的其他时间运行Python脚本时,代码工作没有任何问题。在这些情况下,日志立即继续如下所示,列出数据库中的所有表:

2020-10-30 08:13:03 [11444:6368] sqlalchemy.engine.base.Engine._execute_context(base.py:1235) INFO: SELECT [INFORMATION_SCHEMA].[TABLES].[TABLE_NAME] 
FROM [INFORMATION_SCHEMA].[TABLES] 
WHERE [INFORMATION_SCHEMA].[TABLES].[TABLE_SCHEMA] = CAST(? AS NVARCHAR(max)) AND [INFORMATION_SCHEMA].[TABLES].[TABLE_TYPE] = CAST(? AS NVARCHAR(max)) ORDER BY [INFORMATION_SCHEMA].[TABLES].[TABLE_NAME]
2020-10-30 08:13:03 [11444:6368] sqlalchemy.engine.base.Engine._execute_context(base.py:1240) INFO: ('dbo', 'BASE TABLE')
2020-10-30 08:13:03 [11444:6368] sqlalchemy.engine.base.Engine._init_metadata(result.py:810) DEBUG: Col ('TABLE_NAME',)
2020-10-30 08:13:03 [11444:6368] sqlalchemy.engine.base.Engine.process_rows(result.py:1260) DEBUG: Row ('t_table1',)
2020-10-30 08:13:03 [11444:6368] sqlalchemy.engine.base.Engine.process_rows(result.py:1260) DEBUG: Row ('t_table2',)
...

**UPDATE 2:**显然,当一个表或其他对象在一个打开的事务中被创建并且还没有提交时,查询[INFORMATION_SCHEMA].[TABLES]将被阻塞(source)。有谁熟悉SQLAlchemy的内部原理,可以建议如何防止它首先进行这种查询?
更新3:在SQLAlchemy github(issue link))上发布这个问题后,SQLAlchemy开发人员确认[INFORMATION_SCHEMA].[TABLES]的查询实际上是由Pandas functionto_sql()引起的。

所以,我的新问题是,有人知道如何在Pandas to_sql()函数中禁用此行为吗?我查看了文档,找不到任何似乎有帮助的东西。

9jyewag0

9jyewag01#

在连接字符串中提到ODBC驱动程序的正确版本已经为我解决了这个问题。我不知道它是如何解决的,但错误已经消失了。
示例代码

from sqlalchemy import create_engine

db_connection_str = ( 
    'mssql+pyodbc://test_user:test_password@localhost/test_database'
    '?driver=ODBC+Driver+17+for+SQL+Server&TrustServerCertificate=yes'
)
engine = create_engine(db_connection_str)

df = pd.DataFrame({'id': [1, 2, 3], 'name': ['John', 'Mary', 'Peter']})
df.to_sql("my_empty_table", engine,if_exists="append", index=False)

字符串

ldfqzlk8

ldfqzlk82#

我对SQLAlchemy不是很熟悉,但我可以告诉你这个问题的Pandas方面。
如果表不存在,Pandas会自动创建一个新表。它判断表是否存在的方法是在SQL Alchemy中调用has_table()has_table()的工作方式是查询信息模式。(至少在MySQL和MSSQL中是这样工作的。)

实现细节

下面是我在Pandas和SQLAlchemy中跟踪这个逻辑时发现的。我们从pandas/io/sql.py开始,在to_sql()中。

table = SQLTable(
            name,
            self,
            frame=frame,
            index=index,
            if_exists=if_exists,
            index_label=index_label,
            schema=schema,
            dtype=dtype,
        )
        table.create()

字符串
SQLTable.create()在这里定义:

class SQLTable(PandasObject):
    [...]
    def create(self):
        if self.exists():
            if self.if_exists == "fail":
                raise ValueError(f"Table '{self.name}' already exists.")
            elif self.if_exists == "replace":
                self.pd_sql.drop_table(self.name, self.schema)
                self._execute_create()
            elif self.if_exists == "append":
                pass
            else:
                raise ValueError(f"'{self.if_exists}' is not valid for if_exists")
        else:
            self._execute_create()


请注意,它无条件地调用exists()。在SQLTable.exists()中,您会发现:

def exists(self):
        return self.pd_sql.has_table(self.name, self.schema)


这最终会在SQLAlchemy中调用has_table():https://docs.sqlalchemy.org/en/13/core/internals.html#sqlalchemy.engine.default.DefaultDialect.has_table
对于MSSQL,这是在SQLAlchemy的sqlalchemy/dialects/mssql/base.py中实现的:

@_db_plus_owner
    def has_table(self, connection, tablename, dbname, owner, schema):
        if tablename.startswith("#"):  # temporary table
            [...]
        else:
            tables = ischema.tables

            s = sql.select(tables.c.table_name).where(
                sql.and_(
                    tables.c.table_type == "BASE TABLE",
                    tables.c.table_name == tablename,
                )
            )

            if owner:
                s = s.where(tables.c.table_schema == owner)

            c = connection.execute(s)

            return c.first() is not None


ischema是information_schema的缩写,这段代码是在那个表上运行select。)

如何修复

我没有看到一个好的,简单的方法来解决这个问题。Pandas假设has_table()是一个廉价的操作。MSSQL不遵循这个假设。无论if_exists设置为什么,Pandas都会在to_sql()期间调用has_table()
不过,我可以想到一种很简单的方法来实现这一点。如果你要monkey-patchpandas.io.sql.SQLTable.create(),那么它就是一个no-op,那么你可以欺骗Pandas认为表已经存在。这样做的缺点是Pandas不会自动创建表。

uurity8g

uurity8g3#

execute set transaction isolation level read uncommitted before calling to_sql/

zf9nrax1

zf9nrax14#

我做这期杂志是为了

只有当表名中有任何小写字母时才会发生这种情况。MyTable将被卡住,而mytable将成功完成。
这是Pandas需要纠正的问题吗?还是SQLAchemy?

解决办法

有一个解决方法-在SQLAchemy引擎中将Transaction Isolation Level设置为Read Uncommitted
https://docs.sqlalchemy.org/en/20/dialects/mssql.html#transaction-isolation-level

engine = sa.create_engine("mssql+pyodbc:///?odbc_connect={}".format(params)
                          , fast_executemany=True
                           , isolation_level="READ UNCOMMITTED"
                          )

字符串

相关问题