python-3.x StreamReader.readline()在阅读换行符时不返回

vxqlmq5t  于 2023-01-03  发布在  Python
关注(0)|答案(1)|浏览(140)

因此,我一直在尝试用Python来测试流,并编写了以下代码。ServiceSubscription.py

class ServiceSubscription():
    def __init__(self) -> None:
        self.subscriber_connections = []
        self.service_connections = []
        self.server_listener = None
        # Dictionary of service readers where key is the name of the service and the value is the reader for the service
        self.service_readers = {}
    
    """
    Create the listening server on port 7777
    """
    async def initiate_server(self):
        server = await asyncio.start_server(self.handle_incoming, '127.0.0.1', 7777)
        addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
        print(f'Serving on {addrs}')
        
        async with server:
            await server.serve_forever()
    
    """
    Handle the incoming connection based on whether the connection is from a service or suscriber
    
    The first message sent should include either 'service:SERVICE_NAME' or 'suscriber: [SERVICE1, SERVICE2, ...]'
    """
    async def handle_incoming(self, reader: StreamReader, writer: StreamWriter):
        data = await reader.read(100)
        message = data.decode()
        addr = writer.get_extra_info('peername')

        print(f"Received {message!r} from {addr!r}")
        if ("Service:" in f"{message!r}"):
            message = message[0:7]
            self.service_connections.append(Connections(reader, writer, message))
            service_reader = ServiceReader(reader=reader, writer=writer)
            self.service_readers[message] = (service_reader)
            await service_reader.broadcast()
        
        elif ("Suscriber:" in f"{message!r}"):
            message = message[0:9]
            self.subscriber_connections.append(Connections(reader, writer, message))
            self.service_readers[message].add_suscribers(writer)
        
        else:
            pass

class ServiceReader():
    def __init__(self, reader:  StreamReader, writer: StreamWriter):
        self.reader = reader
        self.writer = writer
        self.suscribers: Writer = []
        self.loop = asyncio.get_event_loop()

    def stop(self):
        self._stop.set()
        
    """
    Add new subscriber's StreamWriter here
    """
    def add_suscribers(self, writer: StreamWriter):
        # Not sure if this will work
        self.suscribers.append(writer)
        
    """
    Read data and broadcast it to subscribed clients
    """
    async def broadcast(self):
        while not self.reader.at_eof():
            data = await self.reader.readline()
            if b'\n' in data:
                print(True)
            data = data.decode()
            print(data)

WriterTest.py

import asyncio
from os import linesep

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 7777)

    print(f'Send: {message!r}\n')
    writer.write(message.encode())
    await writer.drain()
    
    while not writer.is_closing():
        data = input("Type a message\n")
        data = (data + "\n").encode()
        writer.write(data)
        await writer.drain()
        
    writer.close()
    

asyncio.run(tcp_echo_client('Service: TEST'))

我同时运行python ServiceSubscription.py和python WriterTest.py来模拟客户端和服务器。
在运行www.example.com时ServiceSubscription.py,将打印“服务于('127.0.0.1',7777)"。在执行www.example.com时WriterTest.py,ServiceSubscription.py将打印“接收”服务:TEST' from('127.0.0.1',39923)"。但是,在www.example.com的连接关闭之前,键入的任何内容都不会打印出来WriterTest.py。连接关闭后,ServiceSubcription.py会打印出缓冲区中的剩余字节,并确认读取的数据中是否有换行符,但readline不会拾取它,因为它没有' t遇到换行符后不返回。

z31licg0

z31licg01#

数据编码在一个中断行旁边,几个变量被解压缩为 reader和writer 对象,然后同时is_closing()正在等待一个关闭进程,应该是close(),当消息没有字符时,您可以尝试设置一个条件。

import asyncio
from os import linesep

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 7777)

    print(f'Send: {message!r}\n')
    writer.write(message.encode())
    await writer.drain()
    
    while not writer.is_closing():
        data = input("Type a message\n")
        if len(data)==0: # empty data
            break

        data = (data + "\n").encode()
        writer.write(data)
        await writer.drain()

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

相关问题