我有两个示例模型,如图所示。
class User(db.Model):
id = db.Column(....)
name = db.Column(....)
....
class Transaction(db.Model)
id = db.Column(....)
user_id = db.Column(ForeignKey('User.id').....)
....
我有一个函数,它接受user_id作为参数。
def add_transaction_for_user(user_id: int):
'''Does some processing, finds out if transactions
are missing and adds them if needed.
'''
# The part of the question that needs answering Q1 #
# DB operations querying
foo = query_foo_for_user(user_id)
# non DB operations
missing_transactions = find_transactions_from_foo(foo.property)
if missing_transactions:
# DB operations inserting
add_transactions(missing_transactions)
....
我有一个API路由,每天都被cronjob调用。
@some_blueprint.route('/transactions/autoadd', methods=['POST'])
def autoadd():
'''Searches and adds missing transactions.
'''
all_user_ids: list[int] = query_all_users() # [1, 2, 3...]
# The part of the question that needs answering Q2 #
我有1000多个用户,按顺序执行此操作将花费数小时,这就是为什么我需要使用多线程来优化工作流,因为每个用户都是独立的。我有几个问题
Q1.我可以简单地创建session = Session()
一个scoped_session并将其传递给每个标记为#DB operations
的函数吗?因为我不能把所有的底层代码放在一个函数中,我创建了一些小函数来完成特定的工作,但是没有一个例子作为参数传入会话,一旦工作完成,我必须在这个函数内部还是外部调用Session.remove()
?
Q2.我知道我必须使用类似于
executor = ThreadPoolExecutor(max_workers=4)
for user_id in all_user_ids:
executor.submit(auto_add_transaction_for_user, user_id=user_id)
但是我应该在路由中创建一个会话,并为每个user_id传递executor.submit(...., session=s,....)
吗?还是让每个线程创建自己的线程?
在这种情况下,使用多线程的最简单的例子是什么?
提前感谢所有的帮助,非常感谢。
1条答案
按热度按时间aydmsdu91#
我会这样开始:
1.使用SQLAlchemy的
scoped_session
:要在多线程环境中使用SQLAlchemy,建议使用scoped session
。Flask-SQLAlchemy
是Flask
的扩展,它提供了一个scoped_session
,可以自动为每个线程创建一个新的会话,并在线程运行期间保持会话打开。threaded=True
。这告诉Flask使用multiple threads for handling requests
。