我正在为一个电话通讯写一个API,我的代码接受调度请求,当开始通讯时,它会向外部API发送一个请求,然后由外部API自己发送消息。如果你在docker之外运行所有东西,它工作得很好,但是当你在docker-compose中运行它时,src.newsletter.models.Dispatch.DoesNotExist:派工匹配查询不存在我很确定有这个指数的模型是存在的,但是celery给出了误差
tasks.py
from celery import shared_task
from src.newsletter.models import Dispatch, Client, Message
from config.settings.development import API_SENDING_URL, API_JWT_TOKEN
import requests
@shared_task
def send_message_to_client(dispatch_id, client_id):
dispatch = Dispatch.objects.get(id=dispatch_id)
client = Client.objects.get(id=client_id)
url = f"{API_SENDING_URL}{client.id}"
headers = {"Authorization": f"{API_JWT_TOKEN}"}
data = {
"id": client.id,
"phone": client.phone_number,
"text": dispatch.text_message,
}
try:
response = requests.post(url, headers=headers, json=data)
print(response.status_code)
if response.status_code == 200:
message = Message.objects.create(
status='S',
dispatch=dispatch,
client=client
)
elif response.status_code == 401:
message = Message.objects.create(
status='F',
dispatch=dispatch,
client=client
)
elif response.status_code == 400 or response.elapsed.total_seconds() > 10:
# Повторная попытка отправки
try:
response_retry = requests.post(url, headers=headers, json=data)
if response_retry.status_code == 200:
message = Message.objects.create(
status='S',
dispatch=dispatch,
client=client
)
else:
message = Message.objects.create(
status='F',
dispatch=dispatch,
client=client
)
except requests.exceptions.RequestException:
message = Message.objects.create(
status='F',
dispatch=dispatch,
client=client
)
else:
message = Message.objects.create(
status='F',
dispatch=dispatch,
client=client
)
except requests.exceptions.RequestException:
message = Message.objects.create(
status='F',
dispatch=dispatch,
client=client
)
except Exception as e:
message = Message.objects.create(
status='F',
dispatch=dispatch,
client=client
)
# Обработка ошибки при выполнении запроса
print(f"An error occurred: {str(e)}")
字符串
models.py
from django.db import models
from django.core.validators import MaxValueValidator, MinValueValidator, RegexValidator
from django.utils import timezone
class Client(models.Model):
id = models.AutoField(primary_key=True)
phone_number = models.CharField(
max_length=11,
validators=[
RegexValidator(
regex=r'^7[0-9]{10}$',
message='Phone number must be in the format 7XXXXXXXXXX'
)
]
)
# Все российские операторы имеют код от 900 до 997, в тз не указано нужны именно российские операторы или нет
# поэтому я ограничился этими кодами
mobile_operator_code = models.IntegerField(validators=[MinValueValidator(900), MaxValueValidator(997)])
tag = models.CharField(max_length=100)
timezone = models.CharField(max_length=255)
class Dispatch(models.Model):
id = models.AutoField(primary_key=True)
start_time = models.DateTimeField()
end_time = models.DateTimeField()
text_message = models.TextField()
client_filter = models.CharField(max_length=100)
def send(self):
from .tasks import send_message_to_client
clients = []
try:
if 900 <= int(self.client_filter) <= 997:
clients = Client.objects.filter(mobile_operator_code=int(self.client_filter))
except ValueError:
clients = Client.objects.filter(tag__contains=self.client_filter)
for client in clients:
send_message_to_client.delay(self.id, client.id)
class Message(models.Model):
STATUS_CHOICES = [
('S', 'Sent'),
('F', 'Failed'),
('P', 'Pending'),
]
id = models.AutoField(primary_key=True)
creation_time = models.DateTimeField(default=timezone.now)
status = models.CharField(max_length=1, choices=STATUS_CHOICES)
dispatch = models.ForeignKey(Dispatch, on_delete=models.CASCADE)
client = models.ForeignKey(Client, on_delete=models.CASCADE)
型
docker-compose.yml
version: '3'
services:
db:
image: postgres:latest
hostname: db
environment:
- POSTGRES_USER=solutionfactory
- POSTGRES_PASSWORD=solutionfactory
- POSTGRES_DB=solutionfactory
volumes:
- postgres_data:/var/lib/postgresql/data
rabbitmq:
restart: always
hostname: rabbitmq
image: rabbitmq:management
environment:
RABBITMQ_NODENAME: "rabbit@localhost"
RABBITMQ_ERLANG_COOKIE: "SWQOKODSQALRPCLNMEQG"
RABBITMQ_DEFAULT_USER: "rabbitmq"
RABBITMQ_DEFAULT_PASS: "rabbitmq"
RABBITMQ_DEFAULT_VHOST: "/"
ports:
- 15672:15672
- 5672:5672
web:
build: .
command: python manage.py runserver 0.0.0.0:8000
hostname: web
ports:
- 8000:8000
env_file:
- ./.env.dev
volumes:
- .:/code
links:
- "rabbitmq:test"
- db
- rabbitmq
depends_on:
- db
# networks:
# - my-network
# network_mode: "host"
celery-worker:
restart: always
build: .
working_dir: /code
volumes:
- .:/code
command: bash -c "
rm -rf /tmp/celery*.pid &
python -m celery -A config worker --beat --loglevel=info"
links:
- "rabbitmq:test"
- db
depends_on:
- rabbitmq
flower:
build: .
command: sh -c "sleep 30 && celery -A config.celery flower --address=0.0.0.0 --port=5555 --loglevel=info"
ports:
- 5555:5555
volumes:
- .:/code
links:
- "rabbitmq:test"
depends_on:
- rabbitmq
volumes:
postgres_data:
型
码头文件
FROM python:3.10.10
WORKDIR /solutionfactory-test-task
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
ENV PIP_ROOT_USER_ACTION=ignore
RUN pip install --upgrade pip
COPY Pipfile Pipfile.lock /solutionfactory-test-task/
RUN pip install pipenv && pipenv install --system
COPY . /solutionfactory-test-task/
RUN python manage.py migrate
RUN python manage.py collectstatic --no-input
# copy entrypoint.sh
COPY ./entrypoint.sh /entrypoint.sh
# copy project
COPY . .
# run entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]
型
错误讯息
task-celery-worker-1 | Traceback (most recent call last):
task-celery-worker-1 | File "/usr/local/lib/python3.10/site-packages/celery/app/trace.py", line 477, in trace_task
task-celery-worker-1 | R = retval = fun(*args, **kwargs)
task-celery-worker-1 | File "/usr/local/lib/python3.10/site-packages/celery/app/trace.py", line 760, in __protected_call__
task-celery-worker-1 | return self.run(*args, **kwargs)
task-celery-worker-1 | File "/code/src/newsletter/tasks.py", line 9, in send_message_to_client
task-celery-worker-1 | dispatch = Dispatch.objects.get(id=dispatch_id)
task-celery-worker-1 | File "/usr/local/lib/python3.10/site-packages/django/db/models/manager.py", line 87, in manager_method
task-celery-worker-1 | return getattr(self.get_queryset(), name)(*args, **kwargs)
solutionfactory-test-task-celery-worker-1 | File "/usr/local/lib/python3.10/site-packages/django/db/models/query.py", line 637, in get
task-celery-worker-1 | raise self.model.DoesNotExist(
task-celery-worker-1 | src.newsletter.models.Dispatch.DoesNotExist: Dispatch matching query does not exist.
型
我试着在论坛上找类似的东西,但我找不到
1条答案
按热度按时间vom3gejh1#
这是将Celery + DRF与原子事务一起使用时的常见问题。
通常情况下,在当前事务被提交到数据库之前,您就调用了celery任务,因此该任务可能在实际创建
Dispatch
对象之前运行。如果这确实是你问题所在,那么在将任务发送给celery之前确保事务已经提交的一个好方法是利用Django的transaction.on_commit()
方法:字符串