如何使用Pika发送和接收RabbitMQ消息?

s1ag04yj  于 2022-12-23  发布在  RabbitMQ
关注(0)|答案(2)|浏览(245)

我在让Pika以与AMQP或RabbitMQ文档一致的方式处理路由密钥或交换时遇到了一些问题。我知道RabbitMQ文档使用的是旧版本的Pika,所以我忽略了它们的示例代码。
我尝试做的是定义一个队列,“订单”,并有两个消费者,一个处理交换或routing_key“生产”,一个处理“测试”。从RabbitMQ文档来看,通过使用直接交换和路由键或使用主题交换,应该很容易做到这一点。
然而,Pika似乎不知道如何处理交换和路由密钥,使用RabbitMQ管理工具检查队列,很明显,Pika要么没有正确地将消息排队,要么RabbitMQ只是将其丢弃。
在使用者方面,我不太清楚应该如何将使用者绑定到交换或处理路由密钥,文档也没有什么帮助。
如果我放弃所有的想法或交换和路由键,消息队列很好,很容易被我的消费者处理。
任何指针或示例代码的人会很好。

3lxsmp7m

3lxsmp7m1#

事实证明,我对AMQP的理解是不完整的。
其思路如下:

客户

客户端在获得连接后,除了交换机的名称和路由键之外,不应该关心其他任何东西,也就是说,我们不知道这将在哪个队列中结束。

channel.basic_publish(exchange='order',
                      routing_key="order.test.customer",
                      body=pickle.dumps(data),
                      properties=pika.BasicProperties(
                          content_type="text/plain",
                          delivery_mode=2))

消费者

当通道打开时,我们声明交换和队列

channel.exchange_declare(exchange='order', 
                         type="topic", 
                         durable=True, 
                         auto_delete=False)

channel.queue_declare(queue="test", 
                      durable=True, 
                      exclusive=False, 
                      auto_delete=False, 
                      callback=on_queue_declared)

当队列准备就绪时,在“on_queue_declarated”回调中是一个很好的地方,我们可以使用我们想要的路由键将队列绑定到交换机。

channel.queue_bind(queue='test', 
                   exchange='order', 
                   routing_key='order.test.customer')

#handle_delivery is the callback that will actually pickup and handle messages
#from the "test" queue
channel.basic_consume(handle_delivery, queue='test')

使用路由键“order.test.customer”发送到“order”交换机的消息现在将被路由到“test”队列,消费者可以在此提取它。

zlwx9yxi

zlwx9yxi2#

虽然Simon的答案总的来说似乎是正确的,但您可能需要交换参数以使用

channel.basic_consume(queue='test', on_message_callback=handle_delivery)

基本设置类似于

credentials = pika.PlainCredentials("some_user", "some_password")
parameters = pika.ConnectionParameters(
    "some_host.domain.tld", 5672, "some_vhost", credentials
)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

要开始消耗:

channel.start_consuming()

相关问题