rabbitmq celery 和定制消费者

fae0ux8s  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(4)|浏览(235)

据我所知,Celery同时充当消息的生产者和消费者。这不是我想要实现的。我希望Celery只充当消费者,根据我发送给我选择的AMQP代理的消息触发某些任务。这可能吗?
或者我需要把胡萝卜加到我的肉堆里来做汤吗?

6fe3ivhb

6fe3ivhb1#

Celery经纪人充当消息存储库,并将它们发布给订阅这些消息的一个或多个工作者,
所以:celery把消息传递给一个代理(rabbitmq,redist,celery本身通过djangodb,等等)这些消息由一个工作者按照代理的协议检索,并存储它们(通常它们是持久的,但可能取决于你的代理),然后由你的工作者执行。
任务结果可在正在执行的工作任务的上获得,您可以配置存储这些结果的位置,并可以使用此方法检索它们。
你可以发布任务,并向你的“接收器函数”传递参数(你定义的任务,文档有一些examples,通常你不想在这里传递大的东西(比如一个查询集),而只传递允许你在执行任务时检索所需内容的最小信息。
一个简单例子可以是:
注册任务

@task
def add(x,x):
    return x+y

并从另一个模块使用以下命令调用:

from mytasks import add

metadata1 = 1
metadata2 = 2
myasyncresult = add.delay(1,2)
myasyncresult.get() == 3

编辑

在你编辑之后,我看到你可能想从celery 以外的其他来源构造消息,你可以看到here的消息格式,它们默认为pickle对象,遵守这种格式,所以你把这些消息发布到你的rabbitmq代理的正确队列中,然后你就可以从你的工人那里检索它们了。

epggiuax

epggiuax2#

Celery使用message broker architectural pattern。许多实现/代理传输可以与Celery一起使用,包括RabbitMQDjango database
Wikipedia开始:
消息代理是一种用于消息验证、消息转换和消息路由的体系结构模式。它协调应用程序之间的通信,最大限度地减少应用程序为了能够交换消息而应该相互了解的情况,有效地实现解耦。
保留结果是可选的,需要结果后端。您可以使用不同的代理和结果后端。Celery入门指南包含更多信息。
你的问题的答案是是的你可以通过传递参数来触发特定的任务,而不需要向混合中添加Carrot

gdrx4gfi

gdrx4gfi3#

Celery Custom Consumer将是3.1v版本中发布的一个功能,目前正在开发中,您可以阅读http://docs.celeryproject.org/en/master/userguide/extending.html

neskvpey

neskvpey4#

为了消费celery 的信息,你需要创建celery 可以消费的信息。你可以创建celery 信息如下:-

def get_celery_worker_message(task_name,args,kwargs,routing_key,id,exchange=None,exchange_type=None):

    message=(args, kwargs, None)

    application_headers={
        'lang': 'py',
        'task': task_name,
        'id':id,
        'argsrepr': repr(args),
        'kwargsrepr': repr(kwargs)
        #, 'origin': '@'.join([os.getpid(), socket.gethostname()])
    }
    properties={
        'correlation_id':id,
        'content_type': 'application/json',
        'content_encoding': 'utf-8',
    }

    body, content_type, content_encoding = prepare(
            message, 'json', 'application/json', 'utf-8',None, application_headers)

    prep_message = prepare_message(body,None,content_type,content_encoding,application_headers,properties)

    inplace_augment_message(prep_message, exchange, exchange_type, routing_key,id)

    # dump_json = json.dumps(prep_message)

    # print(f"json encoder:- {dump_json}")

    return prep_message

您需要通过首先定义序列化程序、content_type、content_encoding、compression和基于使用者的标头来准备消息。

def prepare( body, serializer=None, content_type=None,
                 content_encoding=None, compression=None, headers=None):

        # No content_type? Then we're serializing the data internally.
        if not content_type:
            serializer = serializer
            (content_type, content_encoding,
             body) = dumps(body, serializer=serializer)
        else:
            # If the programmer doesn't want us to serialize,
            # make sure content_encoding is set.
            if isinstance(body, str):
                if not content_encoding:
                    content_encoding = 'utf-8'
                body = body.encode(content_encoding)

            # If they passed in a string, we can't know anything
            # about it. So assume it's binary data.
            elif not content_encoding:
                content_encoding = 'binary'

        if compression:
            body, headers['compression'] = compress(body, compression)

        return body, content_type, content_encoding

def prepare_message( body, priority=None, content_type=None,
                        content_encoding=None, headers=None, properties=None):
        """Prepare message data."""
        properties = properties or {}
        properties.setdefault('delivery_info', {})
        properties.setdefault('priority', priority )

        return {'body': body,
                'content-encoding': content_encoding,
                'content-type': content_type,
                'headers': headers or {},
                'properties': properties or {}}

一旦创建了消息,您需要添加参数,使celery 消费者可读。

def inplace_augment_message(message, exchange,exchange_type, routing_key,next_delivery_tag):
    body_encoding_64 = 'base64'

    message['body'], body_encoding = encode_body(
        str(json.dumps(message['body'])), body_encoding_64
    )
    props = message['properties']
    props.update(
        body_encoding=body_encoding,
        delivery_tag=next_delivery_tag,
    )
    if exchange and exchange_type:
        props['delivery_info'].update(
            exchange=exchange,
            exchange_type=exchange_type,
            routing_key=routing_key,
        )
    elif exchange:
        props['delivery_info'].update(
            exchange=exchange,
            routing_key=routing_key,
        )
    else:
        props['delivery_info'].update(
            exchange=None,
            routing_key=routing_key,
        )

class Base64:

    """Base64 codec."""

    def encode(self, s):
        return bytes_to_str(base64.b64encode(str_to_bytes(s)))

    def decode(self, s):
        return base64.b64decode(str_to_bytes(s))

def encode_body( body, encoding=None):
    codecs = {'base64': Base64()}
    if encoding:
        return codecs.get(encoding).encode(body), encoding
    return body, encoding

相关问题