rabbitmq Celery在执行任务时并没有找到模特

kt06eoxx  于 2023-08-05  发布在  RabbitMQ
关注(0)|答案(1)|浏览(115)

我正在为一个电话通讯写一个API,我的代码接受调度请求,当开始通讯时,它会向外部API发送一个请求,然后由外部API自己发送消息。如果你在docker之外运行所有东西,它工作得很好,但是当你在docker-compose中运行它时,src.newsletter.models.Dispatch.DoesNotExist:派工匹配查询不存在我很确定有这个指数的模型是存在的,但是celery给出了误差
tasks.py

from celery import shared_task
from src.newsletter.models import Dispatch, Client, Message
from config.settings.development import API_SENDING_URL, API_JWT_TOKEN
import requests

@shared_task
def send_message_to_client(dispatch_id, client_id):
    dispatch = Dispatch.objects.get(id=dispatch_id)
    client = Client.objects.get(id=client_id)

    url = f"{API_SENDING_URL}{client.id}"
    headers = {"Authorization": f"{API_JWT_TOKEN}"}
    data = {
        "id": client.id,
        "phone": client.phone_number,
        "text": dispatch.text_message,
    }

    try:
        response = requests.post(url, headers=headers, json=data)
        print(response.status_code)
        if response.status_code == 200:
            message = Message.objects.create(
                status='S',
                dispatch=dispatch,
                client=client
            )
        elif response.status_code == 401:
            message = Message.objects.create(
                status='F',
                dispatch=dispatch,
                client=client
            )
        elif response.status_code == 400 or response.elapsed.total_seconds() > 10:
            # Повторная попытка отправки
            try:
                response_retry = requests.post(url, headers=headers, json=data)
                if response_retry.status_code == 200:
                    message = Message.objects.create(
                        status='S',
                        dispatch=dispatch,
                        client=client
                    )
                else:
                    message = Message.objects.create(
                        status='F',
                        dispatch=dispatch,
                        client=client
                    )
            except requests.exceptions.RequestException:
                message = Message.objects.create(
                    status='F',
                    dispatch=dispatch,
                    client=client
                )
        else:
            message = Message.objects.create(
                status='F',
                dispatch=dispatch,
                client=client
            )
    except requests.exceptions.RequestException:
        message = Message.objects.create(
            status='F',
            dispatch=dispatch,
            client=client
        )
    except Exception as e:
        message = Message.objects.create(
            status='F',
            dispatch=dispatch,
            client=client
        )
        # Обработка ошибки при выполнении запроса
        print(f"An error occurred: {str(e)}")

字符串
models.py

from django.db import models
from django.core.validators import MaxValueValidator, MinValueValidator, RegexValidator
from django.utils import timezone

class Client(models.Model):
    id = models.AutoField(primary_key=True)
    phone_number = models.CharField(
        max_length=11,
        validators=[
            RegexValidator(
                regex=r'^7[0-9]{10}$',
                message='Phone number must be in the format 7XXXXXXXXXX'
            )
        ]
    )
    # Все российские операторы имеют код от 900 до 997, в тз не указано нужны именно российские операторы или нет
    # поэтому я ограничился этими кодами
    mobile_operator_code = models.IntegerField(validators=[MinValueValidator(900), MaxValueValidator(997)])
    tag = models.CharField(max_length=100)
    timezone = models.CharField(max_length=255)

class Dispatch(models.Model):
    id = models.AutoField(primary_key=True)
    start_time = models.DateTimeField()
    end_time = models.DateTimeField()
    text_message = models.TextField()
    client_filter = models.CharField(max_length=100)

    def send(self):
        from .tasks import send_message_to_client
        clients = []
        try:
            if 900 <= int(self.client_filter) <= 997:
                clients = Client.objects.filter(mobile_operator_code=int(self.client_filter))
        except ValueError:
            clients = Client.objects.filter(tag__contains=self.client_filter)
        for client in clients:
            send_message_to_client.delay(self.id, client.id)

class Message(models.Model):
    STATUS_CHOICES = [
        ('S', 'Sent'),
        ('F', 'Failed'),
        ('P', 'Pending'),
    ]
    id = models.AutoField(primary_key=True)
    creation_time = models.DateTimeField(default=timezone.now)
    status = models.CharField(max_length=1, choices=STATUS_CHOICES)
    dispatch = models.ForeignKey(Dispatch, on_delete=models.CASCADE)
    client = models.ForeignKey(Client, on_delete=models.CASCADE)


docker-compose.yml

version: '3'
services:
  db:
    image: postgres:latest
    hostname: db
    environment:
      - POSTGRES_USER=solutionfactory
      - POSTGRES_PASSWORD=solutionfactory
      - POSTGRES_DB=solutionfactory
    volumes:
      - postgres_data:/var/lib/postgresql/data

  rabbitmq:
    restart: always
    hostname: rabbitmq
    image: rabbitmq:management
    environment:
      RABBITMQ_NODENAME: "rabbit@localhost"
      RABBITMQ_ERLANG_COOKIE: "SWQOKODSQALRPCLNMEQG"
      RABBITMQ_DEFAULT_USER: "rabbitmq"
      RABBITMQ_DEFAULT_PASS: "rabbitmq"
      RABBITMQ_DEFAULT_VHOST: "/"
    ports:
      - 15672:15672
      - 5672:5672

  web:
    build: .
    command: python manage.py runserver 0.0.0.0:8000
    hostname: web
    ports:
      - 8000:8000
    env_file:
      - ./.env.dev
    volumes:
      - .:/code
    links:
      - "rabbitmq:test"
      - db
      - rabbitmq
    depends_on:
      - db
#    networks:
#      - my-network
#    network_mode: "host"

  celery-worker:
    restart: always
    build: .
    working_dir: /code
    volumes:
      - .:/code
    command: bash -c "
      rm -rf /tmp/celery*.pid &
      python -m celery -A config worker --beat --loglevel=info"

    links:
      - "rabbitmq:test"
      - db
    depends_on:
      - rabbitmq

  flower:
    build: .
    command: sh -c "sleep 30 && celery -A config.celery flower --address=0.0.0.0 --port=5555 --loglevel=info"
    ports:
      - 5555:5555
    volumes:
      - .:/code
    links:
      - "rabbitmq:test"
    depends_on:
      - rabbitmq

volumes:
  postgres_data:


码头文件

FROM python:3.10.10

WORKDIR /solutionfactory-test-task

ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
ENV PIP_ROOT_USER_ACTION=ignore

RUN pip install --upgrade pip

COPY Pipfile Pipfile.lock /solutionfactory-test-task/
RUN pip install pipenv && pipenv install --system

COPY . /solutionfactory-test-task/

RUN python manage.py migrate
RUN python manage.py collectstatic --no-input

# copy entrypoint.sh
COPY ./entrypoint.sh /entrypoint.sh
# copy project
COPY . .
# run entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]


错误讯息

task-celery-worker-1  | Traceback (most recent call last):
task-celery-worker-1  |   File "/usr/local/lib/python3.10/site-packages/celery/app/trace.py", line 477, in trace_task
task-celery-worker-1  |     R = retval = fun(*args, **kwargs)
task-celery-worker-1  |   File "/usr/local/lib/python3.10/site-packages/celery/app/trace.py", line 760, in __protected_call__
task-celery-worker-1  |     return self.run(*args, **kwargs)
task-celery-worker-1  |   File "/code/src/newsletter/tasks.py", line 9, in send_message_to_client
task-celery-worker-1  |     dispatch = Dispatch.objects.get(id=dispatch_id)
task-celery-worker-1  |   File "/usr/local/lib/python3.10/site-packages/django/db/models/manager.py", line 87, in manager_method
task-celery-worker-1  |     return getattr(self.get_queryset(), name)(*args, **kwargs)
solutionfactory-test-task-celery-worker-1  |   File "/usr/local/lib/python3.10/site-packages/django/db/models/query.py", line 637, in get
task-celery-worker-1  |     raise self.model.DoesNotExist(
task-celery-worker-1  | src.newsletter.models.Dispatch.DoesNotExist: Dispatch matching query does not exist.


我试着在论坛上找类似的东西,但我找不到

vom3gejh

vom3gejh1#

这是将Celery + DRF与原子事务一起使用时的常见问题。
通常情况下,在当前事务被提交到数据库之前,您就调用了celery任务,因此该任务可能在实际创建Dispatch对象之前运行。如果这确实是你问题所在,那么在将任务发送给celery之前确保事务已经提交的一个好方法是利用Django的transaction.on_commit()方法:

from django.db import transaction

transaction.on_commit(lambda: send_message_to_client.delay(self.id, client.id))

字符串

相关问题