rabbitmq Django model.save()执行不一致的更新

bwleehnv  于 12个月前  发布在  RabbitMQ
关注(0)|答案(1)|浏览(222)

我使用django ORM在RabbitMQ消费者的回调函数中与MySQL数据库通信。这些消费者运行在单独的线程上,每个消费者都建立了自己的连接到其队列。
下面是我的两个消费者回调的代码:

任务执行服务

# imports
from pika.spec import Basic
from pika.channel import Channel
from pika import BasicProperties

import uuid

from jobs.models import Task

from exceptions import MasterConsumerServiceError as ServiceError

from .master_service import MasterConsumerSerivce

class TaskExecutorService(MasterConsumerSerivce):
  queue = 'master_tasks'

  @classmethod
  def callback(cls, ch: Channel, method: Basic.Deliver, properties: BasicProperties, message: dict):
    # get task
    task_id_str = message.get('task_id')
    task_id = uuid.UUID(task_id_str)
    task_qs = Task.objects.filter(pk=task_id)
    if not task_qs.exists():
      raise ServiceError(message=f'Task {task_id_str} does not exist')
    task = task_qs.first()

    # check if task is stopped
    if task.status == cls.Status.TASK_STOPPED:
      raise ServiceError(message=f'Task {task_id_str} is stopped')

    # send task to results queue
    publisher = cls.get_publisher(queue=cls.Queues.results_queue)
    published, error = publisher.publish(message=message | {'status': True, 'error': None})
    if not published:
      raise ServiceError(message=str(error))

    # update task status
    task.status = cls.Status.TASK_PROCESSING
    task.save()

    return

字符串

ResultsHandlerService

# imports
from pika.spec import Basic
from pika.channel import Channel
from pika import BasicProperties

import uuid

from jobs.models import Task
from exceptions import MasterConsumerServiceError as ServiceError

from .master_service import MasterConsumerSerivce

class ResultHandlerService(MasterConsumerSerivce):
  queue = 'master_results'

  @classmethod
  def callback(cls, ch: Channel, method: Basic.Deliver, properties: BasicProperties, message: dict):
    # get task
    task_id_str = message.get('task_id')
    task_id = uuid.UUID(task_id_str)
    task_qs = Task.objects.filter(pk=task_id)
    if not task_qs.exists():
      raise ServiceError(message=f'Task {task_id_str} does not exist')
    task = task_qs.first()

    # get result data and status
    data = message.get('data')
    status = message.get('status')

    # if task is not successful
    if not status:
      # fail task
      task.status = cls.Status.TASK_FAILED
      task.save()

      # fail job
      task.job.status = cls.Status.JOB_FAILED
      task.job.save()

      return

    # update task status
    task.status = cls.Status.TASK_DONE
    task.save()

    # check if job is complete
    task_execution_order = task.process.execution_order
    next_task_qs = Task.objects.select_related('process').filter(job=task.job, process__execution_order=task_execution_order + 1)
    is_job_complete = not next_task_qs.exists()

    # check job is complete
    if is_job_complete:
      # publish reults
      publisher = cls.get_publisher(queue=cls.Queues.output_queue)
      published, error = publisher.publish(message={'job_id': str(task.job.id), 'data': data})
      if not published:
        raise ServiceError(message=str(error))

      # update job status
      task.job.status = cls.Status.JOB_DONE
      task.job.save()

    # otherwise
    else:
      # publish next task
      next_task = next_task_qs.first()
      publisher = cls.get_publisher(queue=cls.Queues.tasks_queue)
      published, error = publisher.publish(message={'task_id': str(next_task.id), 'data': data})
      if not published:
        raise ServiceError(message=str(error))

      # update next task status
      next_task.status = cls.Status.TASK_QUEUED
      next_task.save()

    return


问题是,无论我在哪里使用:

task.status = cls.Status.TASK_ABC
task.save()


结果行为非常不稳定。有时候一切正常,所有状态都按预期更新,但大多数情况下,即使流程流按预期完成,输出队列也会填充结果,状态也不会更新。如果我在执行task.save()后记录任务状态,记录的状态也是我期望看到的,但数据库内的值永远不会更新。
我会很乐意提供更多的代码,如果需要的话。请帮助我解决这个问题。

0sgqnhkj

0sgqnhkj1#

这被称为“竞争条件”。你有两个不同的线程同时修改同一个对象。
在某个时间点,它们都将具有陈旧的数据。
也就是说,当线程B更改并保存对象时,线程A的数据变得陈旧。此时,如果线程A保存对象,那么它将把陈旧的数据(旧值)保存到数据库中。因此,数据将恢复到最初的状态。
另外,线程可以以任何顺序执行,这就是为什么你会看到这种不稳定的行为。
这是因为当数据库中的数据发生变化时,Django不会自动更新模型示例对象。这就是数据变得陈旧的原因。
解决竞态条件的方法是使用锁。
现在,我不太理解你的代码中发生的所有事情,所以我不能给予你一个解决方案作为代码示例。但是你可以通过使用select_for_update和原子事务来解决你的问题。

相关问题