如何做一个简单的Pika SelectConnection来发送消息,在python?

jljoyd4f  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(3)|浏览(380)

我正在尝试转换我的代码,以通过Pika发送rabbitmq消息。我在理解如何使用异步连接(如SelectConnection)发送简单消息方面遇到了很多麻烦。
在我使用amqp库的旧代码中,我简单地声明了一个类,如下所示:

import amqp as amqp

class MQ():

    mqConn = None
    channel = None

    def __init__(self):
        self.connect()

    def connect(self):
        if self.mqConn is None:
            self.mqConn = amqp.Connection(host="localhost", userid="dev", password="dev", virtual_host="/", insist=False)
            self.channel = self.mqConn.channel()

        elif not self.mqConn.connected:
            self.mqConn = amqp.Connection(host="localhost", userid="dev", password="dev", virtual_host="/", insist=False)
            self.channel = self.mqConn.channel()

    def sendMQ(self, message):
        self.connect()
        lMessage = amqp.Message(message)
        self.channel.basic_publish(lMessage, exchange="DevMatrixE", routing_key="dev_matrix_q")

然后在代码的其他地方调用sendMQ(“这是我的消息”),然后代码继续运行,我不需要监听确认等。
有人可以编写一个简单的类,利用pika和SelectConnection,也可以使用sendMQ发送消息吗(“这是我的消息”)?我看过pika的例子,但我不知道如何绕过iloop和KeyboardInterrupt。我想我只是不知道如何让我的代码在没有所有这些try/except的情况下继续运行......此外,我不太确定我怎么能通过所有的回调传递我的信息。
任何帮助都是感激不尽的!

  • 谢谢-谢谢
1hdlvixo

1hdlvixo1#

整个事情是回调驱动的,因为它是一种异步的做事方式。异步消费者很容易理解,我们可以通过提供回调函数来获得消息。但是发布者部分有点难以理解,至少对于初学者来说是这样。
通常我们需要一个队列来进行通信,发布者定期从队列中获取数据。
使用SelectConnection的关键是将发布消息函数注册到事件循环中,这可以通过connection.add_timeout来完成。完成发布后,注册下一轮发布。
下一个问题是初始注册放在哪里。初始注册可以在通道打开回调中完成。
下面是一个代码片段,可帮助您更好地理解。请注意,它尚未准备好生产。因为它只能以每秒10条消息的最大速度发布消息。您需要调整发布间隔,并在一次回调中发布更多消息。

class MQ(Object):
    def __init___(self, queue):
        self.queue = queue
    def on_channel_open(self, chn):
        self.channel = chn
        self.connection.add_timeout(0.1, self.schedule_next_message)
    def schedule_next_message(self):
        try:
            msg = self.queue.get(True, 0.01)
            self.channel.basic_publish('YOUR EXCHANGE','YOUR ROUTING KEY',msg)
        except Queue.Empty:
            pass
        self.connection.add_timeout(0.1, self.schedule_next_message)
    def on_open(self, conn):
        self.connection = conn
        self.connection.channel(on_open_callback=self.on_channel_open)
    def run(self):
        # create a connection
        self.connection = pika.SelectConnection(pika.ConnectionParameters(heartbeat=600,host=args.mq_ip),self.on_open)
        try:
            self.connection.ioloop.start()
        except Exception:
            print("exception in publisher")
            self.connection.close()
            self.connection.ioloop.start()

把MQ(queue).run()放在一个单独的线程中,无论何时你想把消息放到mq中,只要把它放到queue对象中就行了。

zpf6vheq

zpf6vheq2#

我更新了来自TerrenceSun的代码,使其可以与最新版本的pika(当前版本为v1.3.0)一起工作,并且还添加了一个线程,以便所有内容都可以在一个自包含的类中工作:(注:必须按照Andrew的建议使用call_later)


# async_messenger.py : simple asynchronous rabbitmq message producer

# based on https://stackoverflow.com/questions/30332320/how-to-do-a-simple-pika-selectconnection-to-send-a-message-in-python

import os
import sys
import time
import traceback
import logging
import json
from optparse import OptionParser
import pika
import queue
import threading

'''
USAGE:
python async_messenger.py --debuglevel=1
cat ./async_messenger.log
'''

logger = logging.getLogger(__name__)

class AsyncMessenger:
    def __init__(self, debuglevel=0, queue=queue.Queue()):
        self.debuglevel = debuglevel

        if self.debuglevel > 0:
            print('AsyncMessenger: init debuglevel:',debuglevel)

        self.credentials = pika.PlainCredentials('guest','guest')
        self.parameters = pika.ConnectionParameters(host='localhost',
                                               port=5672,
                                               virtual_host='/',
                                               credentials=self.credentials,
                                               heartbeat=600)

        self.queue = queue
        self.exchange = 'YOUR EXCHANGE'
        self.routing_key = 'YOUR ROUTING KEY'
        self.msgThread = None

    # self.start -> (creates thread) -> self.run
    def run(self):
        print('AsyncMessenger: run')
        self.connection = pika.SelectConnection(parameters=self.parameters,
                                                on_open_callback=self.on_open)
        try:
            print('AsyncMessenger: run: connection.ioloop.start')
            self.connection.ioloop.start()
        except Exception as e:
            print("exception in publisher:",format(e))
            # traceback.print_exc(file=sys.stdout)
            self.connection.close()
            self.connection.ioloop.start()

    # run -> on_open
    def on_open(self, conn):
        print('AsyncMessenger: on_open')
        self.connection = conn
        self.connection.channel(on_open_callback=self.on_channel_open)

    # run -> on_open -> on_channel_open
    def on_channel_open(self, chn):
        print('AsyncMessenger: on_channel_open')
        self.channel = chn
        self.connection.ioloop.call_later(0.1, self.schedule_next_message)

    # run -> on_open -> on_channel_open -> schedule_next_message
    def schedule_next_message(self):
        if (self.debuglevel > 1): print('AsyncMessenger: schedule_next_message')
        try:
            msg = self.queue.get(True, 0.01)
            print('AsyncMessenger: queue msg:',msg)
            self.channel.basic_publish(self.exchange,self.routing_key,msg)
        except queue.Empty:
            pass
        self.connection.ioloop.call_later(0.1, self.schedule_next_message)

    def close(self):
        print('AsyncMessenger: close')
        self.connection.ioloop.stop()
        self.connection.close()

    # start our own self contained thread in class
    def start(self):
        print('AsyncMessenger: start')

        # function for worker thread
        def message_worker():
            self.run()

        # Turn-on the worker thread.
        self.msgThread = threading.Thread(target=message_worker, daemon=True)

        # start the threads
        self.msgThread.start()

def main():
    parser = OptionParser()

    parser.add_option("--debuglevel", action="store", type="int", \
                      nargs=1, dest="debuglevel", default=0)

    (options, args) = parser.parse_args()

    debuglevel = options.debuglevel

    log_file = './async_messenger.log'
    logging.basicConfig(filename=log_file, level=logging.INFO, format= \
               '%(name)s : %(asctime)s : Line: %(lineno)d - %(levelname)s  ::  %(message)s', \
               datefmt='%m/%d/%Y %I:%M:%S %p')
    logger = logging.getLogger(__name__)

    q = queue.Queue()

    asyncMessenger = AsyncMessenger(debuglevel, q)

    # Send task requests to the worker.
    for item in range(10):
        print('adding queue item:',item)
        # put a str so each item has len
        q.put(str(item))

    asyncMessenger.start()

    # keep checking queue, exit when empty
    while (q.qsize() > 0):
        time.sleep(1)

    asyncMessenger.close()

    # blocking wait for the threads to complete
    # Note: thread will wait forever unless we use: connection.ioloop.stop()
    asyncMessenger.msgThread.join()

    print('All work completed')

if __name__ == '__main__':
    main()

如果一切顺利,输出应该如下所示:

python async_messenger.py --debuglevel=1
AsyncMessenger: init debuglevel: 1
adding queue item: 0
adding queue item: 1
adding queue item: 2
adding queue item: 3
adding queue item: 4
adding queue item: 5
adding queue item: 6
adding queue item: 7
adding queue item: 8
adding queue item: 9
AsyncMessenger: start
AsyncMessenger: run
AsyncMessenger: run: connection.ioloop.start
AsyncMessenger: on_open
AsyncMessenger: on_channel_open
AsyncMessenger: queue msg: 0
AsyncMessenger: queue msg: 1
AsyncMessenger: queue msg: 2
AsyncMessenger: queue msg: 3
AsyncMessenger: queue msg: 4
AsyncMessenger: queue msg: 5
AsyncMessenger: queue msg: 6
AsyncMessenger: queue msg: 7
AsyncMessenger: queue msg: 8
AsyncMessenger: queue msg: 9
AsyncMessenger: close
All work completed
exdqitrt

exdqitrt3#

作为第一种方法,我建议你从文章末尾提供的pub/sub示例开始。一旦你理解了这个简单的示例,就可以开始按照末尾代码块之前提供的教程学习了。这个教程有6个不同的用例,用它的python例子。通过前5个步骤,你就会理解它的工作方式。你应该有清楚的交换概念(将消息路由到每个队列的实体)、绑定密钥(用于连接交换和队列的关键字),路由关键字(与来自发布者的消息沿着发送的密钥,交换使用该密钥将消息路由到一个队列或另一个队列)和队列(可以存储消息的缓冲器,可以具有多于1个(或者如果需要的话为1个)订户,并且可以从多于1个交换机获得消息,并且基于不同的绑定密钥)。此外,交换类型不止一种(扇出、主题(这一种可能是您需要的)...)。
如果这听起来很新鲜,请按照RabbitMQ提供的教程操作:
https://www.rabbitmq.com/tutorials/tutorial-one-python.html
pub.py:


# !/usr/bin/env python

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                  routing_key='hello',
                  body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()

sub.py:


# !/usr/bin/env python

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)

channel.basic_consume(callback,
                  queue='hello',
                  no_ack=True)

channel.start_consuming()

相关问题