在对外部mysql数据库执行查询时,我看到了错误的查询结果,但只有在从heroku上运行的celery 任务进行连接时才看到。同样的任务,当在我自己的机器上运行时,不会显示这些错误,并且错误只出现大约一半的时间(尽管当它们失败时,所有任务都是错误的)。
这些任务由celery 通过redis管理,mysql数据库本身并不在heroku上运行。我的本地计算机和heroku都连接到同一个mysql数据库。
我用mysql连接数据库,用pymysql驱动程序,用;
DB_URI = 'mysql+pymysql://USER:PW@SERVER/DB'
engine = create_engine(stats_config.DB_URI, convert_unicode=True, echo_pool=True)
db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))
Base = declarative_base()
Base.query = db_session.query_property()
任务一个接一个地执行。
下面是一个具有不同结果的任务示例:
@shared_task(bind=True, name="get_gross_revenue_task")
def get_gross_revenue_task(self, g_start_date, g_end_date, START_TIME_FORM):
db_session.close()
start_date = datetime.strptime(g_start_date, '%d-%m-%Y')
end_date = datetime.strptime(g_end_date, '%d-%m-%Y')
gross_rev_trans_VK = db_session.query(func.sum(UsersTransactionsVK.amount)).filter(UsersTransactionsVK.date_added >= start_date, UsersTransactionsVK.date_added <= end_date, UsersTransactionsVK.payed == 'Yes').scalar()
gross_rev_trans_Stripe = db_session.query(func.sum(UsersTransactionsStripe.amount)).filter(UsersTransactionsStripe.date_added >= start_date, UsersTransactionsStripe.date_added <= end_date, UsersTransactionsStripe.payed == 'Yes').scalar()
gross_rev_trans = db_session.query(func.sum(UsersTransactions.amount)).filter(UsersTransactions.date_added >= start_date, UsersTransactions.date_added <= end_date, UsersTransactions.on_hold == 'No').scalar()
if gross_rev_trans_VK is None:
gross_rev_trans_VK = 0
if gross_rev_trans_Stripe is None:
gross_rev_trans_Stripe = 0
if gross_rev_trans is None:
gross_rev_trans = 0
print ('gross', gross_rev_trans_VK, gross_rev_trans_Stripe, gross_rev_trans)
total_gross_rev = gross_rev_trans_VK + gross_rev_trans_Stripe + gross_rev_trans
return {'total_rev' : str(total_gross_rev / 100), 'current': 100, 'total': 100, 'statistic': 'get_gross_revenue', 'time_benchmark': (datetime.today() - START_TIME_FORM).total_seconds()}
# Selects gross revenue between selected dates
@app.route('/get-gross-revenue', methods=["POST"])
@basic_auth.required
@check_verified
def get_gross_revenue():
if request.method == "POST":
task = get_gross_revenue_task.apply_async([session['g_start_date'], session['g_end_date'], session['START_TIME_FORM']])
return json.dumps({}), 202, {'Location': url_for('taskstatus_get_gross_revenue', task_id=task.id)}
这些都是简单而快速的任务,在几秒钟内完成。
任务因产生微小差异而失败。例如,对于一个正确的结果为30111的任务,当事物中断时,该任务将产生29811。总是使用`db的代码
我尝试的是:
我已通过执行以下操作使用同一时区:
db_session.execute("SET SESSION time_zone = 'Europe/Berlin'")
我检查了工作日志中的错误。虽然有些条目
2013 Lost connection to MySQL
sqlalchemy.exc.ResourceClosedError: This result object does not return rows. It has been closed automatically
2014 commands out of sync
我还没有发现sql错误和错误结果之间的关联。错误的任务结果可能会在没有连接丢失的情况下出现。
一个非常糟糕的修复方法是硬编码其中一个任务的预期结果,首先执行该任务,然后在生成的结果不正确时重新提交所有内容。
这可能是我使用sqlalchemy会话的方式中的缓存或隔离级别问题。因为我只需要使用select(无插入或更新),所以在运行任务之前,我还尝试了不同的隔离级别设置,例如
# db_session.close()
# db_session.commit()
# db_session.execute('SET TRANSACTION READ ONLY')
当我在heroku上运行这些时,它们会显示一个错误,但是当我在windows机器上运行它们时,它们会工作。
我还试图改变与 'isolation_level="READ UNCOMMITTED'
,没有任何结果。
我确信工人们不会重复使用同样的东西 db_session
.
似乎只有使用 db_session
在查询中可能返回错误的结果。使用 query
属性 Base
基类(a) db_session.query_property()
对象,例如。 Users.query
)似乎没有问题。我以为基本上是一样的?
1条答案
按热度按时间ddarikpa1#
您正在不同工作区的任务之间使用会话。为每个celery 工人创建会话,甚至为每个任务创建会话。
要知道任务实际上是按每个工作人员持久化的。您可以使用它为每个任务缓存一个会话,这样就不必每次运行任务时都重新创建会话。使用自定义任务类最容易做到这一点;文档使用数据库连接缓存作为示例。
要使用sqlalchemy会话执行此操作,请使用:
将其用作:
这只会在当前任务需要一个sqlalchemy会话时为其创建一个sqlalchemy会话
self.session
.