我用python写了一个简单的pub/sub草图。下面是完整的源代码,但我遇到的问题是,当代理通过send()
广播消息时,客户端不会recv()
任何东西。其他的一切似乎都很正常:listen()
和accept()
正在工作,当客户端调用send()
时,代理的recv()
会得到消息。
下面是两个客户端和代理之间交互的序列图:
Broker Client 62863 Client 62867
------------------------------------------------------------
start()
start()
(62863) joined.
start()
(62867) joined.
Hello from 62863
(62863): Hello from 62863
(62863) => (62867)
在最后一步中,Broker调用send('Hello from 62863')
,但客户端62867的recv()
函数没有接收到它。
有什么建议吗?
以下是完整的Broker代码:
import socket
import threading
class Broker(object):
def __init__(self, host='', port=5000):
self._host = host
self._port = port
self._subscribers = []
def start(self):
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.bind((self._host, self._port))
while True:
"""
Wait for a client to request a connection and spawn a thread to
receive and forward messages from that client.
"""
self._socket.listen()
subscriber, addr = self._socket.accept()
print(f'{addr} joined.', flush=True)
self._subscribers.append(subscriber)
threading.Thread(target=self.listen_thread, args=(subscriber,)).start()
def listen_thread(self, publisher):
"""
Wait for a message to arrive from a publisher, broadcast to all other
subscribers.
"""
while True:
msg = publisher.recv(1024).decode()
if msg is not None:
print(f'{publisher.getpeername()} published: {msg}', end='', flush=True)
self.broadcast(publisher, msg)
else:
print(f'{publisher.getpeername()} has disconnected')
return
def broadcast(self, publisher, msg):
for subscriber in self._subscribers:
if publisher != subscriber: # don't send to yourself
print(f'{publisher.getpeername()} => {subscriber.getpeername()}', flush=True)
try:
subscriber.send(msg) # NOTE: See solution below!!!
except:
# broken socket, remove from subscriber list
self._subscribers.remove(subscriber)
if __name__ == "__main__":
Broker().start()
下面是相应的客户端代码:
import socket
import threading
import sys
class StdioClient(object):
"""
A simple pub/sub client:
Anything received on stdin is published to the broker.
Concurrently, anything broadcast by the broker is printed on stdout.
"""
def __init__(self, host='localhost', port=5000):
self._host = host
self._port = port
def start(self):
self._sock = socket.socket()
self._sock.connect((self._host, self._port))
threading.Thread(target=self.stdin_to_sock).start()
threading.Thread(target=self.sock_to_stdout).start()
def stdin_to_sock(self):
"""
Send anything received on stdin to the broker.
"""
for msg in sys.stdin:
self._sock.send(bytes(msg, 'utf-8'))
def sock_to_stdout(self):
"""
Print anything received from the broker on stdout.
"""
while True:
msg = self._sock.recv(1024) # <<<= This never gets a message
print(msg.decode('utf-8'), eol='', flush=True)
if __name__ == '__main__':
StdioClient().start()
1条答案
按热度按时间2j4z5cfb1#
解决了。这是一个愚蠢的,可以预防的,“我应该知道更好的”错误。就在这里:
永远不要使用毯子
except:
来捕获错误,除非你真的知道你在做什么。在本例中,send()
引发了错误,因为msg
是字符串,而不是字节,但是对于blanketexcept:
,没有可见的错误。寓意:始终限定
except:
子句。(In如果你好奇,修复是这样的):