asyncio.start\u unix\u server和redis的套接字错误

5ssjco0h  于 2021-06-09  发布在  Redis
关注(0)|答案(1)|浏览(477)

我正在尝试用python使用asyncio和unix域套接字构建一个玩具内存redis服务器。
我的最小示例只是返回值 baz 对于每个请求:

import asyncio

class RedisServer:
    def __init__(self):
        self.server_address = "/tmp/redis.sock"

    async def handle_req(self, reader, writer):
        await reader.readline()
        writer.write(b"$3\r\nbaz\r\n")
        await writer.drain()
        writer.close()
        await writer.wait_closed()

    async def main(self):
        server = await asyncio.start_unix_server(self.handle_req, self.server_address)
        async with server:
            await server.serve_forever()

    def run(self):
        asyncio.run(self.main())

RedisServer().run()

当我用 redis 使用以下脚本的客户端库,它可以工作:

import time
import redis

r = redis.Redis(unix_socket_path="/tmp/redis.sock")

r.get("foo")
time.sleep(1)
r.get("bar")

但是,如果我移除 time.sleep(1) ,有时有效,有时第二个请求失败,原因是:

Traceback (most recent call last):
  File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 706, in send_packed_command
    sendall(self._sock, item)
  File "/tmp/env/lib/python3.8/site-packages/redis/_compat.py", line 9, in sendall
    return sock.sendall(*args,**kwargs)
BrokenPipeError: [Errno 32] Broken pipe

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "test.py", line 9, in <module>
    r.get("bar")
  File "/tmp/env/lib/python3.8/site-packages/redis/client.py", line 1606, in get
    return self.execute_command('GET', name)
  File "/tmp/env/lib/python3.8/site-packages/redis/client.py", line 900, in execute_command
    conn.send_command(*args)
  File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 725, in send_command
    self.send_packed_command(self.pack_command(*args),
  File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 717, in send_packed_command
    raise ConnectionError("Error %s while writing to socket. %s." %
redis.exceptions.ConnectionError: Error 32 while writing to socket. Broken pipe.

或:

Traceback (most recent call last):
  File "test.py", line 9, in <module>
    r.get("bar")
  File "/tmp/env/lib/python3.8/site-packages/redis/client.py", line 1606, in get
    return self.execute_command('GET', name)
  File "/tmp/env/lib/python3.8/site-packages/redis/client.py", line 901, in execute_command
    return self.parse_response(conn, command_name,**options)
  File "/tmp/env/lib/python3.8/site-packages/redis/client.py", line 915, in parse_response
    response = connection.read_response()
  File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 739, in read_response
    response = self._parser.read_response()
  File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 324, in read_response
    raw = self._buffer.readline()
  File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 256, in readline
    self._read_from_socket()
  File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 201, in _read_from_socket
    raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
redis.exceptions.ConnectionError: Connection closed by server.

我的实现似乎缺少了客户端库所期望的一些关键行为(可能是因为它是异步的)。我错过了什么?

rggaifut

rggaifut1#

如果要在每次请求后关闭套接字,则需要使用 write_eof() ,其中
刷新缓冲写入数据后,关闭流的写入端。
请参见docs.python.org->编写
稍微修改一下的代码如下所示:

async def handle_req(self, reader, writer):
    await reader.readline()
    writer.write(b"$3\r\nbaz\r\n")
    await writer.drain()
    writer.write_eof()
    writer.close()
    await writer.wait_closed()

通常,您不会在每次请求后关闭套接字。
以下示例仅用于说明目的,旨在说明无需关闭插座。当然,您总是读取一行数据,然后根据redis协议解释数据。在这里我们知道发送了两个get命令(每5行,指示符表示包含2个元素的数组,指示符表示字符串,字符串值'get'和一个字符串指示符以及相应的值,即key)

async def handle_req(self, reader, writer):
    print("start")
    for i in range(0, 2):
        for x in range(0, 5):
            print(await reader.readline())
        writer.write(b"$3\r\nbaz\r\n")
        await writer.drain()
    writer.write_eof()
    writer.close()
    await writer.wait_closed()

客户端发送方式如下:

print(r.get("foo"))
print(r.get("bar"))
time.sleep(1)

last time.sleep是为了确保客户端不会立即退出。
控制台上的输出是:

start
b'*2\r\n'
b'$3\r\n'
b'GET\r\n'
b'$3\r\n'
b'foo\r\n'
b'*2\r\n'
b'$3\r\n'
b'GET\r\n'
b'$3\r\n'
b'bar\r\n'

请注意 start 只输出一次,这表明我们可以处理多个请求,而不必立即关闭套接字。

相关问题