尝试从RabbitMQ消费时出现线程问题

4si2a6ki  于 2023-10-20  发布在  RabbitMQ
关注(0)|答案(2)|浏览(153)

我有消费者代码:

class Consumer(threading.Thread):
    def __init__(self,rabbitMQUrl,dgraphUrl):
        super(JaqlConsumer, self).__init__()
        self.parameters = pika.URLParameters(rabbitMQUrl)

    def run(self):    
        self.connection = pika.BlockingConnection(self.parameters)
        self.channel = self.connection.channel()
        self.channel.exchange_declare(exchange='publish', exchange_type='topic')
        result = self.channel.queue_declare('', exclusive=True)
        queue_name = result.method.queue
        self.channel.queue_bind(exchange='publish', queue=queue_name, routing_key='#')
        self.channel.basic_qos(prefetch_count=LIMIT)

    def process(values):
        print ("Process:" + str(len(values)))

    def on_message_callback(chan, method_frame, _header_frame, body, userdata=None):
        data = json.loads(body)
        self.values.append(data)
        if (len(self.values) >= LIMIT):
            process(self.values)
            self.values = []
            chan.basic_ack(delivery_tag=method_frame.delivery_tag,multiple=True)

    self.consumer_tag = self.channel.basic_consume(
        queue=queue_name, on_message_callback=on_message_callback)

    self.channel.start_consuming()

    def close(self):
        if hasattr(self, 'channel'):
            self.channel.basic_cancel(self.consumer_tag)
        if hasattr(self, 'connection'):
            if not self.connection.is_closed:
                self.connection.close()

这是我的 *. py。我正在尝试监听ZK节点,当值从false变为true时,我想从RabbitMQ消费,从True变为false时,我不想连接到RabbitMQ:

consumer = Consumer(brokerUrl)
consumer.setDaemon(True)
def toggleEnabled():
    # Get the enabled value from ZK and watch the next change
    isEnabled = config.get("enabled",enable_watch)
    print (isEnabled)
    if isEnabled:
        consumer = Consumer(brokerUrl,dgraphUrl)
        consumer.setDaemon(True)
        consumer.run()
    else:
        consumer.close()

def enable_watch(event):
    toggleEnabled()

toggleEnabled()

while True:
    time.sleep(1)

主要的问题是,在一次切换之后,切换代码不会运行,我认为这是因为当前线程是RabbitMQ的消费者(这是我暂停脚本时看到的)。从主线程切换到另一个线程的正确设计是什么?

ldioqlga

ldioqlga1#

根据我的经验,你迟早会在使用pika和多线程时遇到问题。我不确定你的实现细节和你是如何使用多线程的,但我也有这个问题使用皮卡,我会告诉你是什么帮助我解决这个问题。Pika不是线程安全的,所以如果你想在线程之间共享通道,你不能在Pika中使用多线程。这里有一个链接到github上关于这个主题的类似讨论:here
所以如果你想使用多线程和pika,我给你的建议是使用多处理或为每个线程使用一个新的连接。每个线程都有一个独立的连接。这不是有效的,但这是唯一的选择,现在我认为。

4c8rllxm

4c8rllxm2#

下面的代码应该是run()方法的一部分:

self.consumer_tag = self.channel.basic_consume(
    queue=queue_name, on_message_callback=on_message_callback)

self.channel.start_consuming()

当你把代码粘贴到问题中时,你犯了错误吗?
我建议将您的代码添加到GitHub存储库或gist中。然后,在pika-python邮件列表上提出您的问题,我将继续在那里提供帮助。堆栈溢出不是一个很好的地方来回援助。

相关问题