celery 如何在redis中存储任务结果

lvmkulzt  于 2023-01-25  发布在  Redis
关注(0)|答案(2)|浏览(230)

我是Python和Celery-Redis的新手,所以如果我的理解不正确,请纠正我。
我在调试一个代码库它的结构像-
TaskClass-〉celery 任务
HandlerClass1, HandlerClass2-〉这些是扩展Object类的python类
应用程序创建TaskClass,比如dumyTask示例,dumyTask创建celery 子任务(我相信这些子任务是唯一的),比如dumySubTask1, dumySubTask2,方法是获取处理程序的签名。
我不明白什么?
1)Celery是如何管理dumySubTask1, dumySubTask2dumyTask的结果的?我的意思是dumySubTask1dumySubTask2的结果应该被聚合并作为dumyTask的结果给出。
2)任务执行后,Celery如何在后端存储任务结果?我的意思是dumySubTask1dumySubTask2的结果是否会存储在后端,然后返回到dumyTask,然后dumyTask将结果返回到QUEUE(如果我说错了,请更正)
3)Celery是否以堆栈的方式维护任务和子任务?请参见快照。Task-SubTask Tree
任何指导都是非常感谢的。谢谢。

ruarlubt

ruarlubt1#

celery 工作者可以调用“任务”。2这个“任务”可以有“子任务”,它们可以“链接”在一起,即顺序调用。3“链接”是celery 画布指南中专门使用的术语。4然后将结果返回到redis中的队列。
celery 工人用于调用“独立任务”,主要用于“网络用例”,即“发送电子邮件”、“点击URL”

2q5ifsrm

2q5ifsrm2#

你需要从celery 示例中得到它
task = app_celery.AsyncResult(task_id)

完整示例-如下

我的celery_worker.py文件是:

import os
import time

from celery import Celery
from dotenv import load_dotenv

load_dotenv(".env")

celery = Celery(__name__)
celery.conf.broker_url = os.environ.get("CELERY_BROKER_URL")
celery.conf.result_backend = os.environ.get("CELERY_RESULT_BACKEND")

@celery.task(name="create_task")
def create_task(a, b, c):
    print(f"Executing create_task it will take {a}")
    [print(i) for i in range(100)]
    time.sleep(a)
    return b + c

我正在使用FastAPI,我的端点是:

# To execute the task
@app.get("/sum")
async def root(sleep_time: int, first_number: int, second_number: int):
    process = create_task.delay(sleep_time, first_number, second_number)
    return {"process_id": process.task_id, "result": process.result}

# To get the task status and result
from celery_worker import create_task, celery
@app.get("/task/{task_id}")
async def check_task_status(task_id: str):
    task = celery.AsyncResult(task_id)
    return {"status": task.status, "result": task.result}

我的.env文件具有:

CELERY_BROKER_URL=redis://redis:6379/0
CELERY_RESULT_BACKEND=redis://redis:6379/0

相关问题