下面是chatGPT v4创建的一个使用zeromq的发布-订阅模式的简单客户端-服务器示例。这是非常不言自明的,并服务于模式示范的目的。问题是客户端在从消息服务器接收数据时挂起。代码看起来很好,但我是zeromq的新手,所以我可能会错。无论如何,任何建议如何解决这个问题将不胜感激!
Server.py
# -*- coding: utf-8 -*-
import zmq
import threading
import json
import os
import time
# Connect the publisher socket
context = zmq.Context()
publisher = context.socket(zmq.PUB)
publisher.bind("tcp://*:5556")
# Connect the router socket for receiving acknowledgements
router = context.socket(zmq.ROUTER)
router.bind("tcp://*:5557")
# Save message info to a dictionary (a stand-in for your database)
messages = {}
# Send messages
consumers = ['consumer1', 'consumer2', ]
for i in range(10):
message_id = str(i)
file_path = f'./content/{i}'
# Save message info to the database
messages[message_id] = {
'file_path': file_path,
'consumers': consumers.copy(), # copy the list because we're going to modify it
'processed_by': [],
}
# Send the message to all consumers
publisher.send_json({
'message_id': message_id,
'text': f'This is message {i}',
'media_path': file_path,
})
# Cleanup process
def cleanup():
while True:
for message_id, message in messages.items():
#print(message.items())
#print(message['processed_by'])
if set(message['consumers']) == set(message['processed_by']):
print(f"Deleting file {message['file_path']}")
# os.remove(message['file_path']) # uncomment this to actually delete the file
del messages[message_id]
time.sleep(5) # pause between cleanup runs
cleanup_thread = threading.Thread(target=cleanup)
cleanup_thread.start()
# Receive acknowledgements
while True:
# Wait for next request from client
message = router.recv_json()
print(f"Received request: {message}")
# Process the acknowledgement
if message['message_id'] in messages:
messages[message['message_id']]['processed_by'].append(message['consumer'])
#time.sleep(5)
Client.py
import zmq
import time
# Prepare context and sockets
context = zmq.Context()
consumer_id = 'consumer1' # change this for each consumer
# Connect the subscriber socket
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5556")
subscriber.setsockopt_string(zmq.SUBSCRIBE, '')
# Connect the dealer socket for sending acknowledgements
dealer = context.socket(zmq.DEALER)
dealer.identity = consumer_id.encode()
dealer.connect("tcp://localhost:5557")
# Process messages
while True:
message = subscriber.recv_json()
print(f"Received message: {message}")
#print(message)
# Send acknowledgement
dealer.send_json({
'message_id': message['message_id'],
'consumer': consumer_id,
})
time.sleep(5) # pause between processing messages
代码看起来是正确的,必须正常工作拜特但由于某种原因,它不会
在Windows 11上运行Python 3.10.5
1条答案
按热度按时间rks48beu1#
代码的主要问题是PUB/SUB套接字是有损的--发布者和订阅者之间没有同步,如果订阅者在发送消息时没有连接和订阅,它将永远看不到该消息。
当您的客户端连接并完成与服务器的协商时,所有的消息都已经发送完毕。
如果您(a)首先启动订阅服务器,(b)向发布服务器添加
sleep
,给予订阅服务器有机会连接,您将看到订阅服务器按预期接收消息。然后你会遇到第二个问题:
当您在ROUTER套接字上接收到一条消息时,该消息是一条 multipart 消息,由客户端ID和实际消息数据组成。所以当你写:
这将爆炸,因为您将接收客户端ID,而不是JSON消息数据,并且这将失败:
服务器的工作版本可能如下所示:
...但这不是很好,因为我前面提到的pub/sub套接字的问题。如果您正在寻找客户端和服务器之间的某种同步,pub/sub套接字是错误的解决方案。