python-3.x 使用sqlalchemy和flask进行多线程处理

am46iovg  于 2023-10-21  发布在  Python
关注(0)|答案(1)|浏览(186)

我有两个示例模型,如图所示。

class User(db.Model):
    id   = db.Column(....)
    name = db.Column(....)
    ....

class Transaction(db.Model)
    id = db.Column(....)
    user_id = db.Column(ForeignKey('User.id').....)
    ....

我有一个函数,它接受user_id作为参数。

def add_transaction_for_user(user_id: int):
    '''Does some processing, finds out if transactions
    are missing and adds them if needed.
    '''
    # The part of the question that needs answering Q1 #

    # DB operations querying
    foo = query_foo_for_user(user_id)

    # non DB operations
    missing_transactions = find_transactions_from_foo(foo.property) 
    
    if missing_transactions:

        # DB operations inserting 
        add_transactions(missing_transactions)
    ....

我有一个API路由,每天都被cronjob调用。

@some_blueprint.route('/transactions/autoadd', methods=['POST'])
def autoadd():
    '''Searches and adds missing transactions.
    '''
    all_user_ids: list[int] = query_all_users() # [1, 2, 3...]

    # The part of the question that needs answering Q2 #

我有1000多个用户,按顺序执行此操作将花费数小时,这就是为什么我需要使用多线程来优化工作流,因为每个用户都是独立的。我有几个问题
Q1.我可以简单地创建session = Session()一个scoped_session并将其传递给每个标记为#DB operations的函数吗?因为我不能把所有的底层代码放在一个函数中,我创建了一些小函数来完成特定的工作,但是没有一个例子作为参数传入会话,一旦工作完成,我必须在这个函数内部还是外部调用Session.remove()
Q2.我知道我必须使用类似于

executor = ThreadPoolExecutor(max_workers=4)
for user_id in all_user_ids:
    executor.submit(auto_add_transaction_for_user, user_id=user_id)

但是我应该在路由中创建一个会话,并为每个user_id传递executor.submit(...., session=s,....)吗?还是让每个线程创建自己的线程?
在这种情况下,使用多线程的最简单的例子是什么?
提前感谢所有的帮助,非常感谢。

aydmsdu9

aydmsdu91#

我会这样开始:
1.使用SQLAlchemy的scoped_session:要在多线程环境中使用SQLAlchemy,建议使用scoped sessionFlask-SQLAlchemyFlask的扩展,它提供了一个scoped_session,可以自动为每个线程创建一个新的会话,并在线程运行期间保持会话打开。

from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
  1. Flask应用程序配置:在Flask应用程序配置中设置threaded=True。这告诉Flask使用multiple threads for handling requests
app.config['SQLALCHEMY_DATABASE_URI'] = 'your_database_uri'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
    'pool_pre_ping': True  # Enable pre-ping to check for stale connections
}
app.app_context().push()  # Push an application context for thread-local data

相关问题