我正在尝试转换我的代码,以通过Pika发送rabbitmq消息。我在理解如何使用异步连接(如SelectConnection)发送简单消息方面遇到了很多麻烦。
在我使用amqp库的旧代码中,我简单地声明了一个类,如下所示:
import amqp as amqp
class MQ():
mqConn = None
channel = None
def __init__(self):
self.connect()
def connect(self):
if self.mqConn is None:
self.mqConn = amqp.Connection(host="localhost", userid="dev", password="dev", virtual_host="/", insist=False)
self.channel = self.mqConn.channel()
elif not self.mqConn.connected:
self.mqConn = amqp.Connection(host="localhost", userid="dev", password="dev", virtual_host="/", insist=False)
self.channel = self.mqConn.channel()
def sendMQ(self, message):
self.connect()
lMessage = amqp.Message(message)
self.channel.basic_publish(lMessage, exchange="DevMatrixE", routing_key="dev_matrix_q")
然后在代码的其他地方调用sendMQ(“这是我的消息”),然后代码继续运行,我不需要监听确认等。
有人可以编写一个简单的类,利用pika和SelectConnection,也可以使用sendMQ发送消息吗(“这是我的消息”)?我看过pika的例子,但我不知道如何绕过iloop和KeyboardInterrupt。我想我只是不知道如何让我的代码在没有所有这些try/except的情况下继续运行......此外,我不太确定我怎么能通过所有的回调传递我的信息。
任何帮助都是感激不尽的!
- 谢谢-谢谢
3条答案
按热度按时间1hdlvixo1#
整个事情是回调驱动的,因为它是一种异步的做事方式。异步消费者很容易理解,我们可以通过提供回调函数来获得消息。但是发布者部分有点难以理解,至少对于初学者来说是这样。
通常我们需要一个队列来进行通信,发布者定期从队列中获取数据。
使用SelectConnection的关键是将发布消息函数注册到事件循环中,这可以通过
connection.add_timeout
来完成。完成发布后,注册下一轮发布。下一个问题是初始注册放在哪里。初始注册可以在通道打开回调中完成。
下面是一个代码片段,可帮助您更好地理解。请注意,它尚未准备好生产。因为它只能以每秒10条消息的最大速度发布消息。您需要调整发布间隔,并在一次回调中发布更多消息。
把MQ(queue).run()放在一个单独的线程中,无论何时你想把消息放到mq中,只要把它放到queue对象中就行了。
zpf6vheq2#
我更新了来自TerrenceSun的代码,使其可以与最新版本的pika(当前版本为v1.3.0)一起工作,并且还添加了一个线程,以便所有内容都可以在一个自包含的类中工作:(注:必须按照Andrew的建议使用call_later)
如果一切顺利,输出应该如下所示:
exdqitrt3#
作为第一种方法,我建议你从文章末尾提供的pub/sub示例开始。一旦你理解了这个简单的示例,就可以开始按照末尾代码块之前提供的教程学习了。这个教程有6个不同的用例,用它的python例子。通过前5个步骤,你就会理解它的工作方式。你应该有清楚的交换概念(将消息路由到每个队列的实体)、绑定密钥(用于连接交换和队列的关键字),路由关键字(与来自发布者的消息沿着发送的密钥,交换使用该密钥将消息路由到一个队列或另一个队列)和队列(可以存储消息的缓冲器,可以具有多于1个(或者如果需要的话为1个)订户,并且可以从多于1个交换机获得消息,并且基于不同的绑定密钥)。此外,交换类型不止一种(扇出、主题(这一种可能是您需要的)...)。
如果这听起来很新鲜,请按照RabbitMQ提供的教程操作:
https://www.rabbitmq.com/tutorials/tutorial-one-python.html
pub.py:
sub.py: