SQL语言+ mysql / mariadb:用组合键批量上插

eagi6jfj  于 2023-01-17  发布在  Mysql
关注(0)|答案(1)|浏览(129)

使用SQLAlchemy和MariaDB后端,我需要批量插入数据。使用this answer,我可以使它工作于具有单个主键的模型。但是,我不能使它工作于组合键。
代码的关键部分是这样的:

# for single pk
    primary_key = [key.name for key in inspect(model).primary_key][0]
    # get all entries to be updated
    for each in DBSession.query(model).filter(getattr(model, primary_key).in_(entries.keys())).all():
        entry = entries.pop(str(getattr(each, primary_key)))

我试着修改它,使它可以使用复合键:

primary_keys = tuple([key.name for key in inspect(model).primary_key])
    # get all entries to be updated
    for each in DBSession.query(model).filter(and_(*[getattr(model, col).in_(entries.keys()) for col in primary_keys])).all():
        print("This is never printed :(")

我猜这个DBSession.query(model).filter(and_(*[getattr(model, col).in_(entries.keys()) for col in primary_keys])).all()没有按预期工作。
下面是一个完整的代码片段,以供参考:

from sqlalchemy import Column, create_engine, and_, or_
from sqlalchemy.types import String
from sqlalchemy.inspection import inspect
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy import inspect, tuple_

DBSession = scoped_session(sessionmaker())

Base = declarative_base()

class Accounts(Base):
    __tablename__ = 'accounts'
    account  = Column(String(50), primary_key=True)
    comment  = Column(String(50))

class Users(Base):
    __tablename__ = 'users'
    user  = Column(String(50), primary_key=True)
    account  = Column(String(50), primary_key=True)
    comment  = Column(String(50))

accounts_data = {"account1": {"account": "account1", "comment": "test"}, "account2": {"account": "account2", "comment": None}}

users_data = {("user1", "account1"): {"user": "user1", "account": "account1", "comment": ""}, ("user1", "account2"): {"user": "user1", "account": "account2", "comment": ""}}

def upsert_data_single_pk(entries, model):
    primary_key = [key.name for key in inspect(model).primary_key][0]
    entries_to_update = []
    entries_to_insert = []
    
    # get all entries to be updated
    for each in DBSession.query(model).filter(getattr(model, primary_key).in_(entries.keys())).all():
        entry = entries.pop(str(getattr(each, primary_key)))
        entries_to_update.append(entry)
        
    # get all entries to be inserted
    for entry in entries.values():
        entries_to_insert.append(entry)

    DBSession.bulk_insert_mappings(model, entries_to_insert)
    DBSession.bulk_update_mappings(model, entries_to_update)

    DBSession.commit()

def upsert_data_multiple_pk(entries, model):
    primary_keys = tuple([key.name for key in inspect(model).primary_key])
    entries_to_update = []
    entries_to_insert = []
    
    # get all entries to be updated
    for each in DBSession.query(model).filter(and_(*[getattr(model, col).in_(entries.keys()) for col in primary_keys])).all():
    # Print the composite primary key value by concatenating the values of the individual columns
        print('-'.join([str(getattr(each, col)) for col in primary_keys]))
        
    # get all entries to be inserted
    for entry in entries.values():
        entries_to_insert.append(entry)

    DBSession.bulk_insert_mappings(model, entries_to_insert)
    DBSession.bulk_update_mappings(model, entries_to_update)

    DBSession.commit()

db_connection_uri = "mysql+pymysql://XXXX:XXXX@XXXX:XXXX/XXXX?charset=utf8mb4"
engine = create_engine(db_connection_uri, echo=False)
DBSession.remove()
DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False)

#Base.metadata.drop_all(engine, checkfirst=True)
Base.metadata.create_all(bind=engine)

#upsert_data_single_pk(accounts_data, Accounts)
upsert_data_multiple_pk(users_data, Users)
z5btuh9x

z5btuh9x1#

我写了一个不同的函数来完成我所需要的:

def upsert(self, model: Type[Base], data: List[Dict[str, Any]]) -> None:
        """Upsert a record into the database.

        If the record already exists, it will be updated. If it does not exist, it will be inserted.

        Parameters:
            model: The SQLAlchemy model representing the table.
            data: The data to be inserted or updated, as a list of dictionaries.
        """
        if not data:
            logger.info("No data to insert")
            return None
        logger.info(f"{len(data)} rows to insert/update to {model.__table__}")
        insert_stmt = insert(model.__table__).values(data)
        primary_keys = ModelTools.get_primary_keys(model)
        to_update = {
            k: getattr(insert_stmt.inserted, k)
            for k in data[0].keys()
            if k not in primary_keys
        }

        on_conflict_stmt = insert_stmt.on_duplicate_key_update(**to_update)
        self.engine.execute(on_conflict_stmt)

这可能不是最好的时间效率,但它的工作正如预期的,所以现在我保留它。

相关问题