python SQLAlchemy外键外部模式

yzuktlbb  于 2022-12-10  发布在  Python
关注(0)|答案(1)|浏览(164)

我想在SQLAlchemy中创建一个指向外部模式中的表的外键。
我使用的小型服务都连接到同一个数据库,并且没有在此服务中定义我要引用的模型(及其表)。
我尝试如下定义外键:

foreign_key_id = Column(String, ForeignKey("schema.table.column"))

但在执行alembic迁移时出现以下错误

sqlalchemy.exc.ArgumentError: 'SchemaItem' object, such as a 'Column' or a 'Constraint' expected, got 'schema.table.column'

我使用的数据库是postgres。
编辑:完整的表格定义如下:

class TopicBody(Base):

    __tablename__ = "topic_body"

    id = Column(Integer, primary_key=True)

    # This is the issue: 
    cust_id = Column(String, ForeignKey("data_store.customer.cust_id"))

    topic_header_id = Column(Integer, ForeignKey("topic_header.id"))
    source = Column(Enum(Source))
    valid_until = Column(DateTime)
    impact = Column(String)
    user_id = Column(Integer, ForeignKey("user_auth.user.id"))
    status = Column(Enum(Status))
    title = Column(String)
    created_at = Column(DateTime, default=datetime.now())
    modified_at = Column(DateTime, default=datetime.now())
q9yhzks0

q9yhzks01#

看起来您需要将外部模式的表反映到您的元数据中,以便SQLAlchemy知道它。您在问题中包含的错误似乎与预期的异常不匹配,因此我不理解这一点,但我在下面提供了一个完整的示例。
我在此答案中找到信息SQLAlchemy: ForeignKey across schemas
下面是我用来测试的一个测试脚本。注意,setup_external_schema代码只是为了测试而设置数据库,您的代码中不需要它,因为您已经创建了外部模式(我假设):

import sys
from sqlalchemy import (
    create_engine,
    Integer,
    String,
    ForeignKey,
    MetaData,
    Table,
    Column,
)
from sqlalchemy.orm import (
    declarative_base,
    Session,
)
from sqlalchemy.schema import CreateSchema

Base = declarative_base()

username, password, db = sys.argv[1:4]

engine = create_engine(f"postgresql+psycopg2://{username}:{password}@/{db}", echo=False)

def setup_external_schema(engine, other_schema_name):
    """ This is just to create the external schema for testing. """

    # @NOTE: After we create this we don't use this metadata again.
    with engine.connect() as conn:
        from sqlalchemy import event

        other_metadata = MetaData()
        event.listen(other_metadata, "before_create", CreateSchema(other_schema_name))
        Table(
            "customer",
            other_metadata,
            Column("cust_id", String, primary_key=True),
            schema=other_schema_name,
        )
        other_metadata.create_all(conn)

# Create the external schema for testing.
setup_external_schema(engine, "data_store")

class TopicBody(Base):

    __tablename__ = "topic_body"

    __table_args__ = dict(schema="public")

    id = Column(Integer, primary_key=True)

    cust_id = Column(String, ForeignKey("data_store.customer.cust_id"))

Base.metadata.reflect(engine, schema="data_store", only=["customer"])

Base.metadata.create_all(engine)

with Session(engine) as session, session.begin():
    pass

异步

对于async,你需要将某些调用,如reflect和createall, Package 在'run_sync中,如下所示:

import asyncio
#...
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.sql import select
#...

async def async_main():
    # Start using async.
    async_engine = create_async_engine(f"postgresql+asyncpg://{username}:{password}@/{db}", echo=False)
    async with async_engine.begin() as conn:
            # Using lambda to make a partial here to pass schema and only.
            await conn.run_sync(lambda engine: Base.metadata.reflect(engine, schema="data_store", only=["customer"]))
            await conn.run_sync(Base.metadata.create_all)

    async with async_engine.connect() as conn:
        result = await conn.execute(select(TopicBody.__table__))
        print(result.fetchall())

asyncio.run(async_main())

相关问题