rabbitmq FastAPI、兔MQ、celery :密码有什么问题吗?

sr4lhrrt  于 2023-02-08  发布在  RabbitMQ
关注(0)|答案(1)|浏览(164)

FastAPI应用程序:

import fastapi as _fastapi
from celery import Celery
from celery.result import AsyncResult

app = _fastapi.FastAPI()

celery_app = Celery(
    "worker",
    broker_url="amqp://guest:guest@rabbit:5672//",
    result_backend="rpc://",
)
celery_app.conf.task_routes = {"celery_worker.test_celery": "test-queue"}
celery_app.conf.update(task_track_started=True)

@app.get("/{word}")
async def root(word: str):
    task = celery_app.send_task("celery_worker.test_celery", args=[word])
    return {"message": "Word received", "id": f"{task}"}

@app.get("/api/result/{task_id}")
async def result(task_id: str):
    task = AsyncResult(task_id)
    # Task Not Ready
    if not task.ready():
        return {"status": task.status}
    # Task done: return the value
    task_result= task.get()
    result = task_result.get("result")
    return {"task_id": str(task_id),
            "status": task_result.get("status"),
            "result": result,
    }

停靠文件:

FROM python:3.10-slim
WORKDIR /app
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
COPY ./requirements.txt .
RUN pip install --upgrade pip && pip install -r requirements.txt --no-cache-dir
COPY . .

docker-compose.yml:

version: '3.8'
services:
  ylab:
    container_name: ylab
    build:
      context: .
    command: "uvicorn main:app --reload --host 0.0.0.0"
    ports:
      - "8000:8000"
    networks:
      - api_network
  rabbit:
    container_name: rabbit
    image: rabbitmq:3.10.7-management
    ports:
      - "15672:15672"
      - "5672:5672"
    networks:
      - api_network
  celery_worker:
    container_name: celery_worker
    build:
      context: .
    command: celery -A main.celery_app worker --loglevel=INFO
    networks:
      - api_network

networks:
  api_network:
    name: api_network

root()函数运行良好,我可以发送消息、返回任务ID,并查看RabbitMQ队列中的所有消息,但是任何任务ID的result()函数都返回task.ready()== False
谁能告诉我这段代码中的错误是什么?

  • 服务信息:*
  • 家兔MQ 3.10.7 *
  • celery *
  • celery @415bde516932 v5.2.3(黎明合唱)*
  • Linux操作系统-5.10.0 - 18-amd64-x86_64-带有-glibc2. 31版本2023年2月5日12时02分49秒 *
  • 应用程序:工作人员:0x7f3679306c20 *
  • 运输:amqp://客人:**@兔子:5672//*
  • 结果:区域方案协调会://*
  • 并发性:8(分叉前)*
  • 任务事件:OFF(启用-E以监视此工作进程中的任务)*
  • [队列]*
  • .〉celery 交换=celery (直接)键=celery *
bpsygsoo

bpsygsoo1#

根据task_track_started的文档:
如果为True,则当辅助进程执行任务时,任务将报告其状态为“已启动”。
但是在你的代码中,你似乎没有任何东西消耗你放在队列中的任务,它们将永远处于PENDING状态。
我首先编写代码来使用自动任务路由,使用<func>.delay而不是底层的send_task方法来调用任务:

import time

import fastapi as _fastapi
from celery import Celery
from celery.result import AsyncResult

app = _fastapi.FastAPI()

celery_app = Celery(
    "worker",
    broker_url="amqp://guest:guest@rabbit:5672//",
    result_backend="rpc://",
)
celery_app.conf.update(task_track_started=True)

@celery_app.task
def test_celery(word):
    time.sleep(10)
    return word.upper()

@app.get("/{word}")
async def root(word: str):
    task = test_celery.delay(word)
    return {"message": "Word received", "id": f"{task}"}

@app.get("/api/result/{task_id}")
async def result(task_id: str):
    task = AsyncResult(task_id)

    # Task Not Ready
    if not task.ready():
        return {"status": task.status}

    # Task done: return the value
    task_result= task.get()
    return {"task_id": str(task_id),
            "result": task_result,
    }

运行上述代码时,连接到/foo会导致:

{"message":"Word received","id":"34bfe48d-6ab3-4dec-ad7d-aa567315a609"}

/api/result/34bfe48d-6ab3-4dec-ad7d-aa567315a609的后续调用将生成:

{"status":"STARTED"}

如果我们等待10秒钟,同样的请求会导致:

{"task_id":"34bfe48d-6ab3-4dec-ad7d-aa567315a609","result":"FOO"}

我们已经演示了在使用自动任务路由时可以正常工作。那么,为什么您的原始代码不能正常工作呢?有三个问题:
1.你没有任何东西看test-queue
您正在将任务交付到test-queue中,但是Celery工作线程正在监视默认的celery队列,您需要使用-Q参数来让它监视test-queue

celery_worker:
  container_name: celery_worker
  build:
    context: .
  command: celery -A main.celery_app worker --loglevel=INFO -Q test-queue
  networks:
    - api_network

1.您没有定义任何任务。
如果添加上一步中的-Q test-queue参数并重新启动环境,尝试连接到/foo将导致Celery工作线程中出现以下回溯:

celery_worker    | [2023-02-05 14:12:40,864: ERROR/MainProcess] Received unregistered task of type 'celery_worker.test_celery'.
celery_worker    | The message has been ignored and discarded.
[...]
celery_worker    | Traceback (most recent call last):
celery_worker    |   File "/usr/local/lib/python3.10/site-packages/celery/worker/consumer/consumer.py", line 591, in on_task_received
celery_worker    |     strategy = strategies[type_]
celery_worker    | KeyError: 'celery_worker.test_celery'

我们可以通过向Celery注册相应的任务来解决这个问题:

@celery_app.task(name="celery_worker.test_celery")
def test_celery(word):
    time.sleep(10)
    return word.upper()

1.通过前面的两个更改,您的代码将成功地将任务提交给Celery,Celery将把它传递给test_celery函数。但是,对/api/result/<id>的调用将失败,并显示:

File "/app/./main.py", line 39, in result
    result = task_result.get("result")
AttributeError: 'str' object has no attribute 'get'

您需要修改result函数,使其看起来更像:

@app.get("/api/result/{task_id}")
async def result(task_id: str):
    task = AsyncResult(task_id)

    # Task Not Ready
    if not task.ready():
        return {"status": task.status}

    # Task done: return the value
    task_result = task.get()
    return {
        "task_id": str(task_id),
        "result": task_result,
    }

通过这三个更改,原始代码可以正常工作。修改后的完整代码如下所示:

import time

import fastapi
from celery import Celery
from celery.result import AsyncResult

app = fastapi.FastAPI()

celery_app = Celery(
    "worker",
    broker_url="amqp://guest:guest@rabbit:5672//",
    result_backend="rpc://",
)
celery_app.conf.task_routes = {"celery_worker.test_celery": "test-queue"}
celery_app.conf.update(task_track_started=True)

@celery_app.task(name="celery_worker.test_celery")
def test_celery(word):
    time.sleep(10)
    return word.upper()

@app.get("/{word}")
async def root(word: str):
    task = celery_app.send_task("celery_worker.test_celery", args=[word])
    return {"message": "Word received", "id": f"{task}"}

@app.get("/api/result/{task_id}")
async def result(task_id: str):
    task = AsyncResult(task_id)

    # Task Not Ready
    if not task.ready():
        return {"status": task.status}

    # Task done: return the value
    task_result = task.get()
    return {
        "task_id": str(task_id),
        "result": task_result,
    }

相关问题