mariadb SQLAlchemy:不同数据库之间的JOIN和在模块中使用不同文件

yftpprvb  于 2022-11-08  发布在  其他
关注(0)|答案(1)|浏览(116)

堆栈

我正在使用:

  • Python 3.10.x
  • 快速API 0.75.x
  • SQLAlchemy 1.4.3x版

摘要

我正在为几个遗留数据库构建一个统一的FastAPI项目(存储在MariaDB 10.3的后端-必须为一些遗留软件保留结构)。
我的SQLA安装程序使用数据库模块来执行以下操作:

/databases.py

import dotenv
import os

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

import .models as models

dotenv.load_dotenv()

engines = {
    'parts': create_engine("mysql+pymysql://" + os.environ['DB_URL'] + "/parts", pool_pre_ping=True, pool_recycle=300),
    'shop': create_engine("mysql+pymysql://" + os.environ['DB_URL'] + "/shop", pool_pre_ping=True, pool_recycle=300),
    'purchasing': create_engine("mysql+pymysql://" + os.environ['DB_URL'] + "/purchasing", pool_pre_ping=True, pool_recycle=300),
    "company": create_engine("mysql+pymysql://" + os.environ['DB_URL'] + "/company", pool_pre_ping=True, pool_recycle=300),
    "auth": create_engine("mysql+pymysql://" + os.environ['DB_URL'] + "/auth", pool_pre_ping=True, pool_recycle=300),
}

DBSession = sessionmaker(autocommit=False, autoflush=False, binds={
    # Catalogue
    models.Shop.Catalogue: engines["shop"],
    models.Shop.Sections: engines["shop"],
    models.Shop.Orders: engines["shop"],
    # ...
    # Parts
    models.Parts.Part: engines["parts"],
    models.Parts.BinLocations: engines["parts"],

    # ...
    #Purchasing
    models.Purchasing.SupplierOrder: engines["purchasing"],
    models.Purchasing.SupplierOrder: engines["purchasing"],
    # Company Data
    models.Company.Staffmember: engines["company"],
    models.Company.Suppliers: engines["company"],
    # API Auth
    models.Auth.User: engines["auth"],
    models.Auth.Privileges: engines["auth"],
})

# Dependency

def getDb():
    db = DBSession()
    try:
        yield db
    finally:
        db.close()

对每个模型都这样做有点费力,但它确实有效。
由于我有几个数据库,我认为为每个数据库创建一个带有子文件的models模块是合乎逻辑的,例如models.Partsmodels.Shopmodels.Purchasemodels.Companymodels.Auth等。
/模型/初始化.py

from importlib.metadata import metadata
from sqlalchemy.orm import declarative_base

base = declarative_base()

from . import Auth, Parts, Shop, Catalogue, Purchasing, Shop

我可以通过将Base对象导入models__init__.py并将其导入到每个子文件中来成功创建关系。例如:

/型号/Auth.py

from . import base as Base

from sqlalchemy.orm import relationship
from sqlalchemy import Column, Integer, String, Numeric, Date, DateTime, ForeignKey, null, or_, and_

class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, nullable=False, primary_key=True)
    username = Column(String(256), nullable=False)
    passhash = Column(String(512), nullable=False)
    email = Column(String, nullable=False)
    enabled = Column(Integer, nullable=True)
    staffmember_id = Column(Integer, nullable=False)

    staffmember = relationship("Company.Staffmember", uselist=False)

/型号/Company.py

from . import base as Base

from sqlalchemy.orm import relationship
from sqlalchemy import Column, Integer, String, Numeric, Date, DateTime, ForeignKey, null, or_, and_

class Staffmebmer(Base):
    __tablename__ = 'staffmembers'

    id = Column(Integer, ForeignKey("users.staffmember_id"), nullable=False, primary_key=True)
    order = Column(Integer, default=0, nullable=False)
    name = Column(String, nullable=True)
    initial = Column(String, nullable=True)
    email = Column(String, nullable=False)
    enabled = Column(Integer, default=0, nullable=False)

    relationship("Auth.User", back_populates="staffmember")

下面的方法效果很好:
第1001章:我的demo.py

from fastapi import Depends

from sqlalchemy.orm import Session

from .. import app, databases, models

@app.get("/api/user/{id}")
async def read_items(id: int, db: Session=Depends(databases.getDb)):
    user = db.query(models.Auth.User).filter(
        models.Auth.User.id == id
    ).first()

    user.staffmember

    return user

访问此URL将返回:(是的,我知道这不安全,它仅用于说明目的,以显示关系的功能!)

{
  "username": "mark",
  "passhash": "<my hash>",
  "enabled": 1,
  "email": "mark@demo.com",
  "id": 1,
  "staffmember_id": 5,
  "staffmember": {
    "order": 20,
    "name": "Mark",
    "email": "mark@demo.com",
    "kStaffmember": 5,
    "initial": "MB",
    "enabled": 1
  }
}

但是,我想使用steffmember的首字母作为可能的用户名,所以当我在OAUTH Authorize脚本中查询用户时,我尝试使用:

from ..models import Auth, Company

# 'username' is provided by the auth script from the standard username/password OAuth fields

def get_user(db: Session, username: str):
    db_user_data = db.query(Auth.User).join(Company.Staffmember).filter(
        or_(
            Auth.User.username == username,
            Auth.User.email == username,
            Company.Staffmember.initial == username
        )
    ).first()

我得到一个异常:

(pymysql.err.ProgrammingError) (1146, "Table 'auth.staffmembers' doesn't exist")

我是否以正确的方式处理这整件事?有没有可能解决这个问题?

hvvq6cgz

hvvq6cgz1#

如果有人在这个问题上遇到困难,需要一个合理的答案,我把它放在SQLAlchemy Git讨论页面上,得到了一个相当理智的答案,帮助我解决了这个问题。
https://github.com/sqlalchemy/sqlalchemy/discussions/8027

总结:

  • 对于不同的数据库,您不需要多个引擎连接到同一个MySQL/mariadb服务器。您只需要在其中一个数据库上启动一个会话-请记住,这是数据库的名称,而不是Python SQA代码中的模型或模块。

我的新网站databases.py:

import dotenv
import os

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

import .models as models

dotenv.load_dotenv()

DBengine = create_engine("mysql+pymysql://" + os.environ['DB_URL'] + "/parts", pool_pre_ping=True, pool_recycle=300)

DBSession = sessionmaker(bind=DBengine autocommit=False, autoflush=False)

# Dependency

def getDb():
    db = DBSession()
    try:
        yield db
    finally:
        db.close()
  • 如果你使用上面的风格,你需要更明确地使用你的ForeignKey()语句,例如ForeignKey("<db>.<table>.<field>"),这样你就明确地告诉SQA在哪个数据库和表中查找每一个。
  • 您需要将数据库名称的名称作为模式添加到每个模型中,例如add __table_args__ = { "schema": "<database name>" }-请记住,这是数据库的名称,而不是Python SQA代码中的模型或模块。
新模块/Auth.py
from . import base as Base

from sqlalchemy.orm import relationship
from sqlalchemy import Column, Integer, String, Numeric, Date, DateTime, ForeignKey, null, or_, and_

class User(Base):
    __tablename__ = 'users' #table is called 'users'
    __table_args__ = { "schema": "auth" } #database is called 'auth'

    id = Column(Integer, nullable=False, primary_key=True)
    username = Column(String(256), nullable=False)
    passhash = Column(String(512), nullable=False)
    email = Column(String, nullable=False)
    enabled = Column(Integer, nullable=True)
    staffmember_id = Column(Integer, nullable=False)

    staffmember = relationship("Company.Staffmember", uselist=False)
新/型号/Company.py
from . import base as Base

from sqlalchemy.orm import relationship
from sqlalchemy import Column, Integer, String, Numeric, Date, DateTime, ForeignKey, null, or_, and_

class Staffmember(Base):
    __tablename__ = 'staffmembers' #table is called 'staffmembers'
    __table_args__ = { "schema": "company" } #database is called 'company'

    id = Column(Integer, ForeignKey("auth.users.staffmember_id"), nullable=False, primary_key=True)
    # ForeignKey now needs to know the database AND table name for the field it refers to
    order = Column(Integer, default=0, nullable=False)
    name = Column(String, nullable=True)
    initial = Column(String, nullable=True)
    email = Column(String, nullable=False)
    enabled = Column(Integer, default=0, nullable=False)

    relationship("Auth.User", back_populates="staffmember")

一旦使用了这个过程,SQA就知道在连接和关系中添加正确的数据库名称前缀,这样就可以正常工作了。

相关问题