我想要的
我有一个外部生产者,它向RabbitMQ示例发送消息到某个队列。我想实现一个celery消费者,它监听这个队列,并在RabbitMQ收到这些队列的消息时立即启动一个任务。
到目前为止我所做的
我保证RabbitMQ实际上接收到消息并将其放入正确的Queue中。我还保证设置了到Broker的连接。(我可以执行Celery Tasks)
然而,当我试图实现的例子从文档预期的日志没有显示出来。
这是一个例子:
from my_proj.celery import app
from celery import bootsteps
class InfoStep(bootsteps.Step):
def __init__(self, parent, **kwargs):
# here we can prepare the Worker/Consumer object
# in any way we want, set attribute defaults, and so on.
print('{0!r} is in init'.format(parent))
def start(self, parent):
# our step is started together with all other Worker/Consumer
# bootsteps.
print('{0!r} is starting'.format(parent))
def stop(self, parent):
# the Consumer calls stop every time the consumer is
# restarted (i.e., connection is lost) and also at shutdown.
# The Worker will call stop at shutdown only.
print('{0!r} is stopping'.format(parent))
def shutdown(self, parent):
# shutdown is called by the Consumer at shutdown, it's not
# called by Worker.
print('{0!r} is shutting down'.format(parent))
app.steps['worker'].add(InfoStep)
app.steps['consumer'].add(InfoStep)
我用celery --app=my_proj.celery:app worker
启动worker。
我错过了什么?我理解错了吗?
编辑:
我尝试了另一种方法来处理bootsteps.ConsumerStep
,如下所示:
my_queue = Queue(
"my_queue",
Exchange("my_exchange"),
)
class MyConsumerStep(bootsteps.ConsumerStep):
def get_consumers(self, channel):
return [
Consumer(
channel,
queues=[my_queue],
callbacks=[self.handle_message],
accept=["json"],
)
]
def handle_message(self, body, message):
print("Received message: {0!r}".format(body))
message.ack()
app.steps["consumer"].add(MyConsumerStep)
我发现我可以通过调用celery --app=my_proj.celery:app controll add_consumer my_queue
来添加一个消费者,它实际上似乎在消费消息,但我希望启动消费者,而不需要专门调用队列的命令行命令。
另外,虽然消息被消费,但它们不会被我编写的消费者处理。相反,我得到以下错误:
[2022-05-31 12:33:44,720: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?
The full contents of the message body was: (some details to the message)
1条答案
按热度按时间ufj5ltwl1#
我有一个类似的设置,但我在这里的函数名称略有不同:尝试使用 on_message 代替 handle_message