python pyzmq客户端在接收数据时永远挂起

e5nszbig  于 2023-06-04  发布在  Python
关注(0)|答案(1)|浏览(264)

下面是chatGPT v4创建的一个使用zeromq的发布-订阅模式的简单客户端-服务器示例。这是非常不言自明的,并服务于模式示范的目的。问题是客户端在从消息服务器接收数据时挂起。代码看起来很好,但我是zeromq的新手,所以我可能会错。无论如何,任何建议如何解决这个问题将不胜感激!
Server.py

# -*- coding: utf-8 -*-
import zmq
import threading
import json
import os
import time

# Connect the publisher socket
context = zmq.Context()
publisher = context.socket(zmq.PUB)
publisher.bind("tcp://*:5556")

# Connect the router socket for receiving acknowledgements
router = context.socket(zmq.ROUTER)
router.bind("tcp://*:5557")

# Save message info to a dictionary (a stand-in for your database)
messages = {}

# Send messages
consumers = ['consumer1', 'consumer2', ]

for i in range(10):
    message_id = str(i)
    file_path = f'./content/{i}'

    # Save message info to the database
    messages[message_id] = {
        'file_path': file_path,
        'consumers': consumers.copy(),  # copy the list because we're going to modify it
        'processed_by': [],
    }

    # Send the message to all consumers
    publisher.send_json({
        'message_id': message_id,
        'text': f'This is message {i}',
        'media_path': file_path,
    })

# Cleanup process
def cleanup():
    while True:
        for message_id, message in messages.items():
            #print(message.items())
            #print(message['processed_by'])
            if set(message['consumers']) == set(message['processed_by']):
                print(f"Deleting file {message['file_path']}")
                # os.remove(message['file_path'])  # uncomment this to actually delete the file
                del messages[message_id]

        time.sleep(5)  # pause between cleanup runs

cleanup_thread = threading.Thread(target=cleanup)
cleanup_thread.start()

# Receive acknowledgements
while True:
    # Wait for next request from client
    message = router.recv_json()
    print(f"Received request: {message}")

    # Process the acknowledgement
    if message['message_id'] in messages:
        messages[message['message_id']]['processed_by'].append(message['consumer'])

    #time.sleep(5)

Client.py

import zmq
import time

# Prepare context and sockets
context = zmq.Context()
consumer_id = 'consumer1'  # change this for each consumer

# Connect the subscriber socket
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5556")
subscriber.setsockopt_string(zmq.SUBSCRIBE, '')

# Connect the dealer socket for sending acknowledgements
dealer = context.socket(zmq.DEALER)
dealer.identity = consumer_id.encode()
dealer.connect("tcp://localhost:5557")


# Process messages
while True:
    message = subscriber.recv_json()   
    print(f"Received message: {message}")
    #print(message)

    # Send acknowledgement
    dealer.send_json({
        'message_id': message['message_id'],
        'consumer': consumer_id,
    })

    time.sleep(5)  # pause between processing messages

代码看起来是正确的,必须正常工作拜特但由于某种原因,它不会
在Windows 11上运行Python 3.10.5

rks48beu

rks48beu1#

代码的主要问题是PUB/SUB套接字是有损的--发布者和订阅者之间没有同步,如果订阅者在发送消息时没有连接和订阅,它将永远看不到该消息。
当您的客户端连接并完成与服务器的协商时,所有的消息都已经发送完毕。
如果您(a)首先启动订阅服务器,(b)向发布服务器添加sleep,给予订阅服务器有机会连接,您将看到订阅服务器按预期接收消息。
然后你会遇到第二个问题:
当您在ROUTER套接字上接收到一条消息时,该消息是一条 multipart 消息,由客户端ID和实际消息数据组成。所以当你写:

message = router.recv_json()

这将爆炸,因为您将接收客户端ID,而不是JSON消息数据,并且这将失败:

json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

服务器的工作版本可能如下所示:

import json
import zmq
import threading
import time

# Cleanup process
def cleanup():
    while True:
        for message_id, message in messages.items():
            # print(message.items())
            # print(message['processed_by'])
            if set(message["consumers"]) == set(message["processed_by"]):
                print(f"Deleting file {message['file_path']}")
                # os.remove(message['file_path'])  # uncomment this to actually delete the file
                del messages[message_id]

        time.sleep(5)  # pause between cleanup runs

# Connect the publisher socket
context = zmq.Context()
publisher = context.socket(zmq.PUB)
publisher.bind("tcp://*:5556")

# Connect the router socket for receiving acknowledgements
router = context.socket(zmq.ROUTER)
router.bind("tcp://*:5557")

# Save message info to a dictionary (a stand-in for your database)
messages = {}

# Send messages
consumers = [
    "consumer1",
    "consumer2",
]

# give the subscriber a chance to connect
time.sleep(2)

for i in range(10):
    message_id = str(i)
    file_path = f"./content/{i}"

    # Save message info to the database
    messages[message_id] = {
        "file_path": file_path,
        "consumers": consumers.copy(),  # copy the list because we're going to modify it
        "processed_by": [],
    }

    # Send the message to all consumers
    publisher.send_json(
        {
            "message_id": message_id,
            "text": f"This is message {i}",
            "media_path": file_path,
        }
    )

cleanup_thread = threading.Thread(target=cleanup)
cleanup_thread.start()

# Receive acknowledgements
while True:
    # Wait for next request from client
    client, data = router.recv_multipart()
    message = json.loads(data)
    print(f"Received request: {message}")

    # Process the acknowledgement
    if message["message_id"] in messages:
        messages[message["message_id"]]["processed_by"].append(message["consumer"])

    # time.sleep(5)

...但这不是很好,因为我前面提到的pub/sub套接字的问题。如果您正在寻找客户端和服务器之间的某种同步,pub/sub套接字是错误的解决方案。

相关问题