FastAPI应用程序:
import fastapi as _fastapi
from celery import Celery
from celery.result import AsyncResult
app = _fastapi.FastAPI()
celery_app = Celery(
"worker",
broker_url="amqp://guest:guest@rabbit:5672//",
result_backend="rpc://",
)
celery_app.conf.task_routes = {"celery_worker.test_celery": "test-queue"}
celery_app.conf.update(task_track_started=True)
@app.get("/{word}")
async def root(word: str):
task = celery_app.send_task("celery_worker.test_celery", args=[word])
return {"message": "Word received", "id": f"{task}"}
@app.get("/api/result/{task_id}")
async def result(task_id: str):
task = AsyncResult(task_id)
# Task Not Ready
if not task.ready():
return {"status": task.status}
# Task done: return the value
task_result= task.get()
result = task_result.get("result")
return {"task_id": str(task_id),
"status": task_result.get("status"),
"result": result,
}
停靠文件:
FROM python:3.10-slim
WORKDIR /app
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
COPY ./requirements.txt .
RUN pip install --upgrade pip && pip install -r requirements.txt --no-cache-dir
COPY . .
docker-compose.yml:
version: '3.8'
services:
ylab:
container_name: ylab
build:
context: .
command: "uvicorn main:app --reload --host 0.0.0.0"
ports:
- "8000:8000"
networks:
- api_network
rabbit:
container_name: rabbit
image: rabbitmq:3.10.7-management
ports:
- "15672:15672"
- "5672:5672"
networks:
- api_network
celery_worker:
container_name: celery_worker
build:
context: .
command: celery -A main.celery_app worker --loglevel=INFO
networks:
- api_network
networks:
api_network:
name: api_network
root()函数运行良好,我可以发送消息、返回任务ID,并查看RabbitMQ队列中的所有消息,但是任何任务ID的result()函数都返回task.ready()== False
谁能告诉我这段代码中的错误是什么?
- 服务信息:*
- 家兔MQ 3.10.7 *
- celery *
- celery @415bde516932 v5.2.3(黎明合唱)*
- Linux操作系统-5.10.0 - 18-amd64-x86_64-带有-glibc2. 31版本2023年2月5日12时02分49秒 *
- 应用程序:工作人员:0x7f3679306c20 *
- 运输:amqp://客人:**@兔子:5672//*
- 结果:区域方案协调会://*
- 并发性:8(分叉前)*
- 任务事件:OFF(启用-E以监视此工作进程中的任务)*
- [队列]*
- .〉celery 交换=celery (直接)键=celery *
1条答案
按热度按时间bpsygsoo1#
根据
task_track_started
的文档:如果为True,则当辅助进程执行任务时,任务将报告其状态为“已启动”。
但是在你的代码中,你似乎没有任何东西消耗你放在队列中的任务,它们将永远处于PENDING状态。
我首先编写代码来使用自动任务路由,使用
<func>.delay
而不是底层的send_task
方法来调用任务:运行上述代码时,连接到
/foo
会导致:对
/api/result/34bfe48d-6ab3-4dec-ad7d-aa567315a609
的后续调用将生成:如果我们等待10秒钟,同样的请求会导致:
我们已经演示了在使用自动任务路由时可以正常工作。那么,为什么您的原始代码不能正常工作呢?有三个问题:
1.你没有任何东西看
test-queue
。您正在将任务交付到
test-queue
中,但是Celery工作线程正在监视默认的celery
队列,您需要使用-Q
参数来让它监视test-queue
:1.您没有定义任何任务。
如果添加上一步中的
-Q test-queue
参数并重新启动环境,尝试连接到/foo
将导致Celery工作线程中出现以下回溯:我们可以通过向Celery注册相应的任务来解决这个问题:
1.通过前面的两个更改,您的代码将成功地将任务提交给Celery,Celery将把它传递给
test_celery
函数。但是,对/api/result/<id>
的调用将失败,并显示:您需要修改
result
函数,使其看起来更像:通过这三个更改,原始代码可以正常工作。修改后的完整代码如下所示: