设置
我有一个python应用程序,它应该使用来自RabbitMQ的消息,并充当Vue2应用程序的SocketIO服务器。当它从RabbitMQ收到消息时,它应该通过SocketIO向Vue2应用程序发送消息。因此我写了两个类RabbitMQHandler
和SocketIOHandler
。我在一个单独的线程中启动RabbitMQHandler
,以便RabbitMQ消费和wsgi服务器可以并行运行。
编码
import random
import threading
import socketio
import eventlet
import sys
import os
import uuid
import pika
from dotenv import load_dotenv
import logging
class RabbitMQHandler():
def __init__(self, RABBITMQ_USER, RABBITMQ_PW, RABBITMQ_IP):
self.queue_name = 'myqueue'
self.exchange_name = 'myqueue'
credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PW)
self.connection = pika.BlockingConnection(pika.ConnectionParameters(RABBITMQ_IP, 5672, '/', credentials))
self.channel = self.connection.channel()
self.channel.queue_declare(queue=self.queue_name)
self.channel.exchange_declare(exchange=self.exchange_name, exchange_type='fanout')
self.channel.queue_bind(exchange=self.exchange_name, queue=self.queue_name)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.connection.close()
def run(self, callback):
logging.info('start consuming messages...')
self.channel.basic_consume(queue=self.queue_name,auto_ack=True, on_message_callback=callback)
self.channel.start_consuming()
class SocketIOHandler():
def __init__(self):
self.id = str(uuid.uuid4())
# create a Socket.IO server
self.sio = socketio.Server(async_mode='eventlet', cors_allowed_origins='*')
# wrap with a WSGI application
self.app = socketio.WSGIApp(self.sio)
self.sio.on('connect_to_backend', self.handle_connect)
self.sio.on('random_number', self.handle_random_number)
def handle_connect(self, sid, msg):
logging.info('new socket io message')
self.emit('connect_success', {
'success': True,
})
def handle_random_number(self, sid, msg):
logging.info('handle_random_number')
self.emit('response_random_number', { 'number': random.randint(0,10)})
def emit(self, event, msg):
logging.info('socket server: {}'.format(self.id))
logging.info('sending event: "{}"'.format(event))
self.sio.emit(event, msg)
logging.info('sent event: "{}"'.format(event))
def run(self):
logging.info('start web socket on port 8765...')
eventlet.wsgi.server(eventlet.listen(('', 8765)), self.app)
def start_rabbitmq_handler(socketio_handler, RABBITMQ_USER, RABBITMQ_PW, RABBITMQ_IP):
def callback(ch, method, properties, body):
logging.info('rabbitmq handler')
socketio_handler.emit('response_random_number', { 'number': random.randint(0,10)})
with RabbitMQHandler(RABBITMQ_USER, RABBITMQ_PW, RABBITMQ_IP) as rabbitmq_handler:
rabbitmq_handler.run(callback=callback)
threads = []
def main():
global threads
load_dotenv()
RABBITMQ_USER = os.getenv('RABBITMQ_USER')
RABBITMQ_PW = os.getenv('RABBITMQ_PW')
RABBITMQ_IP = os.getenv('RABBITMQ_IP')
socketio_handler = SocketIOHandler()
rabbitmq_thread = threading.Thread(target=start_rabbitmq_handler, args=(socketio_handler, RABBITMQ_USER, RABBITMQ_PW, RABBITMQ_IP))
threads.append(rabbitmq_thread)
rabbitmq_thread.start()
socketio_handler.run()
if __name__ == '__main__':
try:
logging.basicConfig(level=logging.INFO)
logging.getLogger("pika").propagate = False
main()
except KeyboardInterrupt:
try:
for t in threads:
t.exit()
sys.exit(0)
except SystemExit:
for t in threads:
t.exit()
os._exit(0)
字符串
问题
问题是,当RabbitMQHandler
收到消息时,事件response_random_number
无法到达Vue2应用程序。即使它在callback
函数中发射。当我将random_number
事件从Vue2应用程序发送到Python应用程序时,我确实从Vue2应用程序中的Python应用程序返回了response_random_number
事件。
所以所有的连接都是独立工作的,而不是一起工作的。我的猜测是,有某种线程通信错误。我将id
添加到SocketIOHandler
类中,以确保它是相同的示例化对象,并且打印结果相同。
日志的套接字服务器:...',sending event: ...
和sent event: ...
告诉我,函数被正确调用。
1条答案
按热度按时间z9zf31ra1#
最后,我的解决方案是有两个Python应用程序:SocketIOHandler和RabbitMQHandler。
对我来说,重要的是RabbitMQHandler可以在RabbitMQ消息上向SocketIOClients发送消息。为此,我使用了SocketIO的
client_manager
。有几个选项可以从外部进程发出事件。最后,我使用了RedisManager,并在另一个不同的系统中直接使用了RabbitMQ的KombuManager。在SocketIOHandler中
字符串
在RabbitMQHandler中
型
然后,您可以在RabbitMQHandler中调用
sio.emit
,方法与在SocketIOHandler中相同。另见github issue。