因此,我一直在尝试用Python来测试流,并编写了以下代码。ServiceSubscription.py
class ServiceSubscription():
def __init__(self) -> None:
self.subscriber_connections = []
self.service_connections = []
self.server_listener = None
# Dictionary of service readers where key is the name of the service and the value is the reader for the service
self.service_readers = {}
"""
Create the listening server on port 7777
"""
async def initiate_server(self):
server = await asyncio.start_server(self.handle_incoming, '127.0.0.1', 7777)
addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
print(f'Serving on {addrs}')
async with server:
await server.serve_forever()
"""
Handle the incoming connection based on whether the connection is from a service or suscriber
The first message sent should include either 'service:SERVICE_NAME' or 'suscriber: [SERVICE1, SERVICE2, ...]'
"""
async def handle_incoming(self, reader: StreamReader, writer: StreamWriter):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"Received {message!r} from {addr!r}")
if ("Service:" in f"{message!r}"):
message = message[0:7]
self.service_connections.append(Connections(reader, writer, message))
service_reader = ServiceReader(reader=reader, writer=writer)
self.service_readers[message] = (service_reader)
await service_reader.broadcast()
elif ("Suscriber:" in f"{message!r}"):
message = message[0:9]
self.subscriber_connections.append(Connections(reader, writer, message))
self.service_readers[message].add_suscribers(writer)
else:
pass
class ServiceReader():
def __init__(self, reader: StreamReader, writer: StreamWriter):
self.reader = reader
self.writer = writer
self.suscribers: Writer = []
self.loop = asyncio.get_event_loop()
def stop(self):
self._stop.set()
"""
Add new subscriber's StreamWriter here
"""
def add_suscribers(self, writer: StreamWriter):
# Not sure if this will work
self.suscribers.append(writer)
"""
Read data and broadcast it to subscribed clients
"""
async def broadcast(self):
while not self.reader.at_eof():
data = await self.reader.readline()
if b'\n' in data:
print(True)
data = data.decode()
print(data)
WriterTest.py
import asyncio
from os import linesep
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 7777)
print(f'Send: {message!r}\n')
writer.write(message.encode())
await writer.drain()
while not writer.is_closing():
data = input("Type a message\n")
data = (data + "\n").encode()
writer.write(data)
await writer.drain()
writer.close()
asyncio.run(tcp_echo_client('Service: TEST'))
我同时运行python ServiceSubscription.py和python WriterTest.py来模拟客户端和服务器。
在运行www.example.com时ServiceSubscription.py,将打印“服务于('127.0.0.1',7777)"。在执行www.example.com时WriterTest.py,ServiceSubscription.py将打印“接收”服务:TEST' from('127.0.0.1',39923)"。但是,在www.example.com的连接关闭之前,键入的任何内容都不会打印出来WriterTest.py。连接关闭后,ServiceSubcription.py会打印出缓冲区中的剩余字节,并确认读取的数据中是否有换行符,但readline不会拾取它,因为它没有' t遇到换行符后不返回。
1条答案
按热度按时间z31licg01#
数据编码在一个中断行旁边,几个变量被解压缩为 reader和writer 对象,然后同时
is_closing()
正在等待一个关闭进程,应该是close()
,当消息没有字符时,您可以尝试设置一个条件。