Celery连接被拒绝,RabbitMQ在Docker容器中运行

wwodge7n  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(2)|浏览(277)

我正在测试一个API,它将长时间运行的作业发送到由Celery worker处理的队列中。我正在使用运行在Docker容器中的RabbitMQ作为消息队列。但是,当向队列发送消息时,我收到以下错误:Error: [Errno 111] Connection refused
重现步骤:

  • 启动RabbitMQ容器:docker run -d -p 5672:5672 rabbitmq
  • 启动Celery服务器:celery -A celery worker --loglevel=INFO
  • 构建Docker映像:docker build -t fastapi .
  • 运行容器docker run -it -p 8000:8000 fastapi

停靠文件:

FROM python:3.9

WORKDIR /

COPY . .

RUN pip install --no-cache-dir --upgrade -r ./requirements.txt

EXPOSE 8000

CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]

requirements.txt:

anyio==3.6.1
asgiref==3.5.2
celery==5.2.7
click==8.1.3
colorama==0.4.4
fastapi==0.78.0
h11==0.13.0
httptools==0.4.0
idna==3.3
pydantic==1.9.1
python-dotenv==0.20.0
PyYAML==6.0
sniffio==1.2.0
starlette==0.19.1
typing_extensions==4.2.0
uvicorn==0.17.6
watchgod==0.8.2
websockets==10.3

app.py:

from fastapi import FastAPI
import tasks

@app.get("/{num}")
async def root(num):
  tasks.update_db.delay(num)
  return {"success": True}

tasks.py:

from celery import Celery
import time

celery = Celery('tasks', broker='amqp://')

@celery.task(name='update_db')
def update_db(num: int) -> None:
  time.sleep(30)
  return
mrphzbgm

mrphzbgm1#

您无法连接到localhost上的rabbitmq;它并没有运行在Python应用程序所在的容器中。既然你已经在你的主机上公开了rabbit,你就可以使用你的主机地址连接到它。一种方法是像这样启动应用程序容器:

docker run -it -p 8000:8000 --add-host host.docker.internal:host-gateway fastapi

然后像这样修改代码:

celery = Celery('tasks',  broker='amqp://host.docker.internal')

准备好这些代码后,让我们重新运行您的示例:

$ docker run -d -p 5672:5672 rabbitmq
$ docker run -d -p 8000:8000 --add-host host.docker.internal:host-gateway fastapi
$ curl http://localhost:8000/1
{"success":true}

如果你只需要从一个容器中访问rabbitmq端口,那么没有必要在你的主机上发布rabbitmq端口。当构建一个有多个容器的应用程序时,使用像docker-compose这样的工具可以让你的工作变得更容易。
如果使用了以下docker-compose.yaml

version: "3"

services:
  rabbitmq:
    image: rabbitmq

  app:
    build:
      context: .
    ports:
      - "8000:8000"

并修改了代码以连接到rabbitmq

celery = Celery('tasks',  broker='amqp://rabbitmq')

然后你可以运行docker-compose up来打开两个容器,你的应用程序会在主机端口8000上公开,但是rabbitmq只对你的应用程序容器可用。
顺便说一句,您可能希望从环境变量中获取代理uri,而不是在代码中硬编码代理uri:

celery = Celery('tasks',  broker=os.getenv('APP_BROKER_URI'))

这样就可以使用不同的连接字符串,而不必每次都重新构建映像。我们需要修改docker-compose.yaml以包含适当的变量:

version: "3"

services:
  rabbitmq:
    image: rabbitmq

  app:
    build:
      context: .
    environment:
      APP_BROKER_URI: "amqp://rabbitmq"
    ports:
      - "8000:8000"
8hhllhi2

8hhllhi22#

更新任务.py

import time

celery = Celery('tasks',  broker='amqp://user:pass@host:port//')

@celery.task(name='update_db')
def update_db(num: int) -> None:
  time.sleep(30)
  return

相关问题