我正在编写一个Flask服务器,它有一个长计算“optimal_portfolio()”。我希望客户端发送一个请求,在后台开始计算,然后再发送另一个请求,以获得结果。我尝试使用Celery和Redis进行后台计算,但当我尝试访问Celery任务时,我只得到task.status = PENDING。
from portfolio_creator.portfolio_creator import optimal_portfolio
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from celery import Celery
import os
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = os.environ['REDISTOGO_URL']
app.config['CELERY_RESULT_BACKEND'] = os.environ['REDISTOGO_URL']
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task
def calculate_optimal_portfolio(risk_coefficient):
return optimal_portfolio(risk_coefficient).to_json(orient="records")
@app.route('/portfolio-creator', methods=['POST'])
def set_portfolio():
risk_coefficient = request.form.get('risk_coefficient', 5)
task = calculate_optimal_portfolio.apply_async(
args=[risk_coefficient])
return jsonify({"task_id": task.id}), 202
@app.route('/portfolio-creator/<task_id>', methods=['GET'])
def get_portfolio(task_id):
result = celery.AsyncResult(task_id).result
if result is None:
return 'Portfolio not ready.', 404
return result, 200
if __name__ == '__main__':
app.run()
字符串
我也试过使用 flask 会话,但这似乎不适合celery 。
1条答案
按热度按时间vyswwuz21#
有一个类似的问题,我的任务正在执行,但状态总是
PENDING
。原来你需要配置celery来更新它的状态。我这样配置了我的 flask 应用程序,然后状态更新正确。
字符串
task_ignore_Result=False
是SUCCESS
状态显示所必需的。task_track_started
是STARTED
状态显示所必需的。更多关于他们的信息在这里:https://docs.celeryq.dev/en/stable/userguide/configuration.html。如果另一个阅读本文的人不使用flask,这个答案应该有助于配置celery而不使用flask https://stackoverflow.com/a/40831283/5915915