python 使用带有异步驱动程序的sqlalchemy迁移数据库的Pytest fixture

hsgswve4  于 2023-01-08  发布在  Python
关注(0)|答案(1)|浏览(124)

问题

我正在尝试编写一个fixture,它将sqlite数据库迁移到由基于declarative_base的ORM模型描述的状态。
按照sqlalchemy docs的指导,我已经找到了一些可行的解决方案,尽管我不知道如何在模块中进行测试之前自动删除await migrate_db.__anext__()或运行migrate_db fixture。

from asyncio import current_task

import pytest
from sqlalchemy import Column, String, Integer, SmallInteger, select
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine, async_scoped_session, \
    AsyncSession
from sqlalchemy.orm import declarative_base, sessionmaker

Base = declarative_base()

class Item(Base):
    __tablename__ = "items"

    id = Column(Integer, primary_key=True)
    name = Column(String)
    quantity = Column(SmallInteger)

@pytest.fixture(scope="module", autouse=True)
def engine() -> AsyncEngine:
    yield create_async_engine(
        'sqlite+aiosqlite://', future=True, echo=True, connect_args={"check_same_thread": False}
    )

@pytest.fixture(scope="module", autouse=True)
async def migrate_db(engine):
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
        await conn.run_sync(Base.metadata.create_all)
    yield
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)

@pytest.fixture(scope="module")
def async_session_maker(engine):
    yield async_scoped_session(
        sessionmaker(engine, expire_on_commit=False, class_=AsyncSession), scopefunc=current_task
    )

@pytest.mark.asyncio
async def test_get(async_session_maker, migrate_db):

    await migrate_db.__anext__()  # Removing this line causes 
    # sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: items not exits.

    async with async_session_maker() as session:
        await session.execute(
            "INSERT INTO items (id, name, quantity) VALUES "
            '(0, "crowbar", 13),'
            '(1, "lamp", 94)'
        )
        stmt = select(Item).where(Item.id == 1)
        rows = await session.execute(stmt)
        result = rows.scalar_one()

        assert result.id == 1
        assert result.name == "lamp"
        assert result.quantity == 94

我试过:

  • 使用asyncio.wait([asyncio.create_task(migate_db_)])作为同步函数,但它不起作用。
  • 正在将async_session_maker转换为等待migrate_db的异步设备,但将async_session_maker转换为异步生成器必须以类似的方式等待migrate_db。
  • 考虑将迁移放入pytest_sessionstart,尽管看起来不正确,因为此数据库设置应仅用于指定模块,而不是整个测试套件。

我期待的是

拥有一个fixture,在调用后将数据库迁移到声明性基描述的状态,而无需在测试中显式等待。

5gfr0r5j

5gfr0r5j1#

通过将设置和拆卸逻辑放入单独的异步函数中,并用在事件循环中运行任务的正常fixture替换异步fixture,设法解决了这个问题。

async def create_all(engine):
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
        await conn.run_sync(Base.metadata.create_all)

async def drop_all(engine):
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)

@pytest.fixture(scope="module", autouse=True)
def migrate_db(engine):
    loop = asyncio.get_event_loop()
    loop.run_until_complete(create_all(engine))
    del loop

    yield

    loop = asyncio.get_event_loop()
    loop.run_until_complete(drop_all(engine))
    del loop

相关问题