redis Flask中的Celery任务未完成

pbossiut  于 2023-08-02  发布在  Redis
关注(0)|答案(1)|浏览(134)

我正在编写一个Flask服务器,它有一个长计算“optimal_portfolio()”。我希望客户端发送一个请求,在后台开始计算,然后再发送另一个请求,以获得结果。我尝试使用Celery和Redis进行后台计算,但当我尝试访问Celery任务时,我只得到task.status = PENDING。

from portfolio_creator.portfolio_creator import optimal_portfolio
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from celery import Celery
import os

app = Flask(__name__)

app.config['CELERY_BROKER_URL'] = os.environ['REDISTOGO_URL']
app.config['CELERY_RESULT_BACKEND'] = os.environ['REDISTOGO_URL']

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

@celery.task
def calculate_optimal_portfolio(risk_coefficient):
    return optimal_portfolio(risk_coefficient).to_json(orient="records")

@app.route('/portfolio-creator', methods=['POST'])
def set_portfolio():
    risk_coefficient = request.form.get('risk_coefficient', 5)

    task = calculate_optimal_portfolio.apply_async(
        args=[risk_coefficient])

    return jsonify({"task_id": task.id}), 202

@app.route('/portfolio-creator/<task_id>', methods=['GET'])
def get_portfolio(task_id):
    result = celery.AsyncResult(task_id).result

    if result is None:
        return 'Portfolio not ready.', 404

    return result, 200

if __name__ == '__main__':

    app.run()

字符串
我也试过使用 flask 会话,但这似乎不适合celery 。

vyswwuz2

vyswwuz21#

有一个类似的问题,我的任务正在执行,但状态总是PENDING
原来你需要配置celery来更新它的状态。我这样配置了我的 flask 应用程序,然后状态更新正确。

app.config.from_mapping(
    CELERY=dict(
        broker_url=REDIS_URL,
        result_backend=REDIS_URL,
        task_ignore_result=False,
        task_track_started=True,
    )
)

字符串
task_ignore_Result=FalseSUCCESS状态显示所必需的。task_track_startedSTARTED状态显示所必需的。
更多关于他们的信息在这里:https://docs.celeryq.dev/en/stable/userguide/configuration.html。如果另一个阅读本文的人不使用flask,这个答案应该有助于配置celery而不使用flask https://stackoverflow.com/a/40831283/5915915

相关问题