Python使用RabbitMQ并运行SocketIO服务器

w1e3prcc  于 2023-08-05  发布在  RabbitMQ
关注(0)|答案(1)|浏览(130)

设置

我有一个python应用程序,它应该使用来自RabbitMQ的消息,并充当Vue2应用程序的SocketIO服务器。当它从RabbitMQ收到消息时,它应该通过SocketIO向Vue2应用程序发送消息。因此我写了两个类RabbitMQHandlerSocketIOHandler。我在一个单独的线程中启动RabbitMQHandler,以便RabbitMQ消费和wsgi服务器可以并行运行。

编码

import random
import threading
import socketio
import eventlet
import sys
import os
import uuid
import pika
from dotenv import load_dotenv
import logging

class RabbitMQHandler():
    def __init__(self, RABBITMQ_USER, RABBITMQ_PW, RABBITMQ_IP):
        self.queue_name = 'myqueue'
        self.exchange_name = 'myqueue'
        credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PW)
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(RABBITMQ_IP, 5672, '/', credentials))
        self.channel = self.connection.channel()

        self.channel.queue_declare(queue=self.queue_name)
        self.channel.exchange_declare(exchange=self.exchange_name, exchange_type='fanout')
        self.channel.queue_bind(exchange=self.exchange_name, queue=self.queue_name)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.connection.close()

    def run(self, callback):
        logging.info('start consuming messages...')
        self.channel.basic_consume(queue=self.queue_name,auto_ack=True, on_message_callback=callback)
        self.channel.start_consuming()

class SocketIOHandler():
    def __init__(self):
        self.id = str(uuid.uuid4())
        # create a Socket.IO server
        self.sio = socketio.Server(async_mode='eventlet', cors_allowed_origins='*')
        # wrap with a WSGI application
        self.app = socketio.WSGIApp(self.sio)

        self.sio.on('connect_to_backend', self.handle_connect)
        self.sio.on('random_number', self.handle_random_number)

    def handle_connect(self, sid, msg):
        logging.info('new socket io message')
        self.emit('connect_success', {
            'success': True,
        })

    def handle_random_number(self, sid, msg):
        logging.info('handle_random_number')
        self.emit('response_random_number', { 'number': random.randint(0,10)})

    def emit(self, event, msg):
        logging.info('socket server: {}'.format(self.id))
        logging.info('sending event: "{}"'.format(event))
        self.sio.emit(event, msg)
        logging.info('sent event: "{}"'.format(event))

    def run(self):
        logging.info('start web socket on port 8765...')
        eventlet.wsgi.server(eventlet.listen(('', 8765)), self.app)

def start_rabbitmq_handler(socketio_handler, RABBITMQ_USER, RABBITMQ_PW, RABBITMQ_IP):
    def callback(ch, method, properties, body):
        logging.info('rabbitmq handler')
        socketio_handler.emit('response_random_number', { 'number': random.randint(0,10)})

    with RabbitMQHandler(RABBITMQ_USER, RABBITMQ_PW, RABBITMQ_IP) as rabbitmq_handler:
        rabbitmq_handler.run(callback=callback)

threads = []

def main():
    global threads
    load_dotenv()
    RABBITMQ_USER = os.getenv('RABBITMQ_USER')
    RABBITMQ_PW = os.getenv('RABBITMQ_PW')
    RABBITMQ_IP = os.getenv('RABBITMQ_IP')

    socketio_handler = SocketIOHandler()
    rabbitmq_thread = threading.Thread(target=start_rabbitmq_handler, args=(socketio_handler, RABBITMQ_USER, RABBITMQ_PW, RABBITMQ_IP))
    threads.append(rabbitmq_thread)
    rabbitmq_thread.start()
    socketio_handler.run()

if __name__ == '__main__':
    try:
        logging.basicConfig(level=logging.INFO)
        logging.getLogger("pika").propagate = False
        main()
    except KeyboardInterrupt:
        try:
            for t in threads:
                t.exit()
        sys.exit(0)
    except SystemExit:
        for t in threads:
            t.exit()
        os._exit(0)

字符串

问题

问题是,当RabbitMQHandler收到消息时,事件response_random_number无法到达Vue2应用程序。即使它在callback函数中发射。当我将random_number事件从Vue2应用程序发送到Python应用程序时,我确实从Vue2应用程序中的Python应用程序返回了response_random_number事件。
所以所有的连接都是独立工作的,而不是一起工作的。我的猜测是,有某种线程通信错误。我将id添加到SocketIOHandler类中,以确保它是相同的示例化对象,并且打印结果相同。
日志的套接字服务器:...',sending event: ...sent event: ...告诉我,函数被正确调用。

z9zf31ra

z9zf31ra1#

最后,我的解决方案是有两个Python应用程序:SocketIOHandler和RabbitMQHandler。
对我来说,重要的是RabbitMQHandler可以在RabbitMQ消息上向SocketIOClients发送消息。为此,我使用了SocketIO的client_manager。有几个选项可以从外部进程发出事件。最后,我使用了RedisManager,并在另一个不同的系统中直接使用了RabbitMQ的KombuManager。
在SocketIOHandler中

sio = socketio.AsyncServer(async_mode='aiohttp', cors_allowed_origins='*', async_handlers=True, client_manager=socketio.AsyncRedisManager('redis://localhost:6379/0'))

字符串
在RabbitMQHandler中

sio = socketio.RedisManager('redis://localhost:6379/0', write_only=True)


然后,您可以在RabbitMQHandler中调用sio.emit,方法与在SocketIOHandler中相同。
另见github issue

相关问题