为RabbitMQ实现Celery消费者

oyt4ldly  于 2023-04-20  发布在  RabbitMQ
关注(0)|答案(1)|浏览(184)

我想要的

我有一个外部生产者,它向RabbitMQ示例发送消息到某个队列。我想实现一个celery消费者,它监听这个队列,并在RabbitMQ收到这些队列的消息时立即启动一个任务。

到目前为止我所做的

我保证RabbitMQ实际上接收到消息并将其放入正确的Queue中。我还保证设置了到Broker的连接。(我可以执行Celery Tasks)
然而,当我试图实现的例子从文档预期的日志没有显示出来。
这是一个例子:

from my_proj.celery import app
from celery import bootsteps

class InfoStep(bootsteps.Step):

    def __init__(self, parent, **kwargs):
        # here we can prepare the Worker/Consumer object
        # in any way we want, set attribute defaults, and so on.
        print('{0!r} is in init'.format(parent))

    def start(self, parent):
        # our step is started together with all other Worker/Consumer
        # bootsteps.
        print('{0!r} is starting'.format(parent))

    def stop(self, parent):
        # the Consumer calls stop every time the consumer is
        # restarted (i.e., connection is lost) and also at shutdown.
        # The Worker will call stop at shutdown only.
        print('{0!r} is stopping'.format(parent))

    def shutdown(self, parent):
        # shutdown is called by the Consumer at shutdown, it's not
        # called by Worker.
        print('{0!r} is shutting down'.format(parent))

app.steps['worker'].add(InfoStep)
app.steps['consumer'].add(InfoStep)

我用celery --app=my_proj.celery:app worker启动worker。
我错过了什么?我理解错了吗?

编辑:

我尝试了另一种方法来处理bootsteps.ConsumerStep,如下所示:

my_queue = Queue(
    "my_queue",
    Exchange("my_exchange"),
)

class MyConsumerStep(bootsteps.ConsumerStep):
    def get_consumers(self, channel):
        return [
            Consumer(
                channel,
                queues=[my_queue],
                callbacks=[self.handle_message],
                accept=["json"],
            )
        ]

    def handle_message(self, body, message):
        print("Received message: {0!r}".format(body))
        message.ack()

app.steps["consumer"].add(MyConsumerStep)

我发现我可以通过调用celery --app=my_proj.celery:app controll add_consumer my_queue来添加一个消费者,它实际上似乎在消费消息,但我希望启动消费者,而不需要专门调用队列的命令行命令。
另外,虽然消息被消费,但它们不会被我编写的消费者处理。相反,我得到以下错误:

[2022-05-31 12:33:44,720: WARNING/MainProcess] Received and deleted unknown message.  Wrong destination?!?
The full contents of the message body was: (some details to the message)
ufj5ltwl

ufj5ltwl1#

我有一个类似的设置,但我在这里的函数名称略有不同:尝试使用 on_message 代替 handle_message

my_queue = Queue(
    "my_queue",
    Exchange("my_exchange"),
)

class MyConsumerStep(bootsteps.ConsumerStep):
    def get_consumers(self, channel):
        return [
            Consumer(
                channel,
                queues=[my_queue],
                callbacks=[self.handle_message],
                accept=["json"],
            )
        ]

    def on_message(self, body, message):
        print("Received message: {0!r}".format(body))
        message.ack()

app.steps["consumer"].add(MyConsumerStep)

相关问题