sqlite 使用apscheduler多线程运行sqlalchemy

l3zydbqr  于 2023-05-29  发布在  SQLite
关注(0)|答案(1)|浏览(300)

我有一个要添加到apscheduler BlockingScheduler的作业列表,其中ThreadPoolExecutor编号与作业数量相同。
我添加的作业使用sqlalchemy并与同一数据库交互,但我收到错误:
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) database is locked
我在我的基础sqlalchemy设置中使用了scoped_session和sessionmaker。

from os.path import join, realpath
    from sqlalchemy import create_engine
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy.orm import scoped_session, sessionmaker
    
    db_name = environ.get("DB_NAME")
    db_path = realpath(join( "data", db_name))
    engine = create_engine(f"sqlite:///{db_path}", pool_pre_ping=True)
    session_factory = sessionmaker(bind=engine)
    Session = scoped_session(session_factory)
    Base = declarative_base()

然后,我添加到apscheduler的调度作业类的示例如下:

from app.data_structures.base import Base, Session, engine
from app.data_structures.job import Job

from app.data_structures.scheduled_job import ScheduledJob
from app.data_structures.user import User

class AccountingProcessorJob(ScheduledJob):
    name: str = "Accounting Processor"
    def __init__(self, resources: AppResources, depends: List[str] = None) -> None:
        super().__init__(resources)

    def job_function(self) -> None:
        account_dir = realpath(environ.get("ACCOUNTING_DIRECTORY"))
        Base.metadata.create_all(engine, Base.metadata.tables.values(), checkfirst=True)
        session = Session()
        try:
            #do some stuff with the session here e.g.
            # with some variables that are setup
            user = User(user_name=user_name)
            session.add(user)
            user.extend(jobs)
            session.commit()
        except:
            session.rollback()
        finally:
            Session.remove()

我的印象是,使用scoped_session和会话工厂将为每个线程启动一个新会话,并使其成为线程安全的。
其中User和Job是sqlalchemy对象,例如:

from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql.expression import false

from app.data_structures.base import Base
from app.data_structures.job import Job
    
    
    

class User(Base):
        __tablename__ = "users"
    
        user_name: Mapped[str] = mapped_column(primary_key=True),
        employee_number = Column(Integer)
        manager = relationship("User", remote_side=[user_name], post_update=True)
        jobs: Mapped[list[Job]] = relationship()
    
        def __init__(
            self,
            user_name: str,
            employee_number: int = None,
            manager: str = None,
    
        ) -> None:
            self.user_name = user_name
            self.employee_number = employee_number
            self.manager = manager

谁能解释一下我做错了什么,以及如何修复它?

ffscu2ro

ffscu2ro1#

此示例使用SQLAlchemy 1.4.48、apscheduler 3.10.1和python模块sqlite3 3.34.1。
我不确定如何完全重现您的工作功能中正在发生的事情,因为不清楚jobsuser.extend(jobs)中的.extend来自何处。
create_all在每个作业中似乎都不能很好地工作(抛出表已经存在的错误),所以我把它放在自己的作业中,只在创建用户的作业之前运行一次。
我使用随机休眠来确保作业间歇运行,而不是在最大线程数的确切块中运行。

from os import environ
from time import sleep
from os.path import join, realpath
from secrets import randbelow

from sqlalchemy import create_engine, Integer, String
from sqlalchemy.schema import Column
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import scoped_session, sessionmaker
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor

db_name = environ.get("DB_NAME")
db_path = realpath(join("data", db_name))
engine = create_engine(f"sqlite:///{db_path}", pool_pre_ping=True)
session_factory = sessionmaker(bind=engine)
Session = scoped_session(session_factory)
Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    username = Column(String)

def setup_func():
    print("Running create_all")
    Base.metadata.create_all(engine, Base.metadata.tables.values(), checkfirst=True)

def job_function(i):
    print("Running job function {}".format(i))
    session = Session()
    try:
        seconds_to_wait = 1 + randbelow(4)
        sleep(seconds_to_wait)
        user = User(username=str(i))
        session.add(user)
        session.commit()
    finally:
        # Will rollback if needed.
        Session.remove()

FIFTEEN_MINUTES_IN_SECONDS = 15 * 60

def main():
    sched = BlockingScheduler(
        timezone="US/Pacific",
        executors={"default": ThreadPoolExecutor(4)},
        job_defaults={"misfire_grace_time": FIFTEEN_MINUTES_IN_SECONDS},
    )
    sched.add_job(setup_func)
    for i in range(100):
        sched.add_job(job_function, args=[i])
    sched.start()

if __name__ == "__main__":
    main()
$ rm -f data/test.sql && DB_NAME=test.sql ve/bin/python test_apscheduler.py 
/home/ian/workspace/laspilitas-project/stackoverflow-and-testing/test_apscheduler.py:18: MovedIn20Warning: Deprecated API features detected! These feature(s) are not compatible with SQLAlchemy 2.0. To prevent incompatible upgrades prior to updating applications, ensure requirements files are pinned to "sqlalchemy<2.0". Set environment variable SQLALCHEMY_WARN_20=1 to show all deprecation warnings.  Set environment variable SQLALCHEMY_SILENCE_UBER_WARNING=1 to silence this message. (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)
  Base = declarative_base()
Running create_all
Running job function 0
Running job function 1
Running job function 2
Running job function 3
Running job function 4
Running job function 5
Running job function 6
Running job function 7
Running job function 8
Running job function 9
Running job function 10
Running job function 11
Running job function 12
Running job function 13
Running job function 14
Running job function 15
Running job function 16
Running job function 17
Running job function 18
Running job function 19

相关问题