我有celery 片段如下:
首先在celery_tasks.tasks.py中定义任务类
import celery
class LoggerDefine(celery.Task):
name = 'message-logger'
def run(self, payload):
pass
class PerformanceMeasureDefine(celery.Task):
name = 'performance-logger'
def run(self, payload, elapseTime):):
pass
字符串
然后从consumers.logger中实现方法
import django
django.setup()
from log_models.models import Level_logs
from django.conf import settings
from common_functions.logger import *
from celery_tasks.tasks import LoggerDefine, PerformanceMeasureDefine
class LoggerImpl(LoggerDefine):
def run(self, payload):
fmt1 = 'Message body: {}'
print("-------------------------------- log persistor consuming --------------------------------")
Level_logs.objects.create(
level=payload.get(LEVEL),
date_time=payload.get(DATETIME),
file_name=payload.get(FILENAME),
line_number=payload.get(LINE_NUMBER),
function_name=payload.get(FUNCTION),
payload=payload.get(PAYLOAD),
)
return fmt1.format(payload)
class PerformanceMeasureImpl(PerformanceMeasureDefine):
def run(self, payload, elapseTime):
fmt1 = 'Message body: {}'
print("-------------------------------- performance consuming --------------------------------")
print("payload = ", payload)
print("elapseTime = ", elapseTime)
return fmt1.format(payload)
型
在celery_tasks.utils中创建celery工作者和消费者的函数
import celery
def create_worker_from(WorkerClass):
assert issubclass(WorkerClass, celery.Task)
app = celery.Celery()
app.config_from_object("django.conf:settings")
app.conf.update(task_default_queue=WorkerClass.name) # update worker queue
worker_task = app.register_task(WorkerClass())
return app, worker_task
型
其中django setting.py文件中的celery设置是:
CELERY_BROKER_URL = "amqp://guest:guest@rabbitmq:5672//"
型
最后,我将它们示例化为两个消费者
from consumers.logger import LoggerImpl, PerformanceMeasureImpl
from celery_tasks.utils import create_worker_from
logger, _ = create_worker_from(LoggerImpl)
logger.worker_main(["worker", "-c", "2"])
performance_measure, _ = create_worker_from(PerformanceMeasureImpl)
performance_measure.worker_main(["worker", "-c", "2"])
型
然后它不工作,我试图注解掉performance_measure并只留下logger如下:
from consumers.logger import LoggerImpl, PerformanceMeasureImpl
from celery_tasks.utils import create_worker_from
logger, _ = create_worker_from(LoggerImpl)
logger.worker_main(["worker", "-c", "2"])
#performance_measure, _ = create_worker_from(PerformanceMeasureImpl)
#performance_measure.worker_main(["worker", "-c", "2"])
型
然后logger工作正常。我还尝试注解掉logger,只留下performance_measure,如下所示:
from consumers.logger import LoggerImpl, PerformanceMeasureImpl
from celery_tasks.utils import create_worker_from
#logger, _ = create_worker_from(LoggerImpl)
#logger.worker_main(["worker", "-c", "2"])
performance_measure, _ = create_worker_from(PerformanceMeasureImpl)
performance_measure.worker_main(["worker", "-c", "2"])
型
那么performance_measure工作正常。
因此,只有一个消费者正常工作,但两个消费者在一起就不正常了。
当两个消息同时工作时,有一个错误和消息卡在消息队列中而没有消耗。
consumer: Cannot connect to amqp://guest:**@rabbitmq:5672//: [Errno 111] Connection refused.
型
顺便说一句,发布者的用法就像
_, logger_worker = create_worker_from(LoggerDefine)
_, performance_measure_worker = create_worker_from(PerformanceMeasureDefine)
logger_worker.apply_async(args=[msg,])
performance_measure_worker.apply_async(
args=[request.data, elapsed_time])
型
我怎么能让其中两个工作?
多谢了。
1条答案
按热度按时间v8wbuo2f1#
我对django不太熟悉,但是一个celery 消费者只能从一个工人阶级开始。为了更好地理解celery,在
--loglevel DEBUG
模式下启动worker。celery 术语及其含义概述:
1.工作者:侦听代理的长时间运行的进程,从代理获取消息,并在当前进程或其子进程中执行这些消息(子进程的数量可以用
concurrency
指定)1.task:celery worker在其注册表中注册的唯一名称(通常为导入名)的对象。在收到来自代理的消息后,celery首先检查任务的唯一字符串是否存在于其注册表中,并分配一个worker来处理它。
1.queue:Queue是Broker上的一个通道。在发送任务时,可以指定队列(默认为
default
),并将消息存储在代理的该通道上。多个队列可用于处理不同的优先级和工作负载。每个worker还可以侦听多个队列,并在消息到来时进行处理。1.经纪人:Broker存储消息,直到它们被任何工作进程处理。可以为不同的代理设置不同的配置。Client/Producer将任务序列化,并将这些消息发送到已定义队列上的代理。消息一直存储在代理上,直到侦听该队列的任何工作进程拾取它们。
可以添加任意数量的生产者和消费者,而无需了解彼此,因为代理负责存储有关活动队列、工作者和生产者的信息。
如果只使用单个broker,建议只使用一个
Celery
示例。这里,示例化了两个对象,而每个worker只需要一个这样的类。您可以为logger和performance分别创建两个文件,并在启动worker时在命令行上传递它们。或者,您可以在启动工作进程时根据命令行参数有条件地配置队列和注册任务。无论哪种方式,您都必须运行两个工作人员才能工作。