asyncio.start_unix_server 和 redis 的套接字错误
Socket errors with asyncio.start_unix_server and redis
我正在尝试使用 asyncio 和 Unix 域套接字在 Python 中构建玩具内存 Redis 服务器。
我的最小示例只是 returns 每个请求的值 baz
:
import asyncio
class RedisServer:
def __init__(self):
self.server_address = "/tmp/redis.sock"
async def handle_req(self, reader, writer):
await reader.readline()
writer.write(b"\r\nbaz\r\n")
await writer.drain()
writer.close()
await writer.wait_closed()
async def main(self):
server = await asyncio.start_unix_server(self.handle_req, self.server_address)
async with server:
await server.serve_forever()
def run(self):
asyncio.run(self.main())
RedisServer().run()
当我用 the redis
client library 使用以下脚本测试两个连续的客户端请求时,它有效:
import time
import redis
r = redis.Redis(unix_socket_path="/tmp/redis.sock")
r.get("foo")
time.sleep(1)
r.get("bar")
但是,如果我删除 time.sleep(1)
,有时它会起作用,有时第二个请求会失败:
Traceback (most recent call last):
File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 706, in send_packed_command
sendall(self._sock, item)
File "/tmp/env/lib/python3.8/site-packages/redis/_compat.py", line 9, in sendall
return sock.sendall(*args, **kwargs)
BrokenPipeError: [Errno 32] Broken pipe
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "test.py", line 9, in <module>
r.get("bar")
File "/tmp/env/lib/python3.8/site-packages/redis/client.py", line 1606, in get
return self.execute_command('GET', name)
File "/tmp/env/lib/python3.8/site-packages/redis/client.py", line 900, in execute_command
conn.send_command(*args)
File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 725, in send_command
self.send_packed_command(self.pack_command(*args),
File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 717, in send_packed_command
raise ConnectionError("Error %s while writing to socket. %s." %
redis.exceptions.ConnectionError: Error 32 while writing to socket. Broken pipe.
或:
Traceback (most recent call last):
File "test.py", line 9, in <module>
r.get("bar")
File "/tmp/env/lib/python3.8/site-packages/redis/client.py", line 1606, in get
return self.execute_command('GET', name)
File "/tmp/env/lib/python3.8/site-packages/redis/client.py", line 901, in execute_command
return self.parse_response(conn, command_name, **options)
File "/tmp/env/lib/python3.8/site-packages/redis/client.py", line 915, in parse_response
response = connection.read_response()
File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 739, in read_response
response = self._parser.read_response()
File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 324, in read_response
raw = self._buffer.readline()
File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 256, in readline
self._read_from_socket()
File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 201, in _read_from_socket
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
redis.exceptions.ConnectionError: Connection closed by server.
我的实现似乎缺少客户端库期望的一些关键行为(可能是因为它是异步的)。我错过了什么?
如果你想在每次请求后关闭套接字,你需要使用 write_eof()
,
Close the write end of the stream after the buffered write data is flushed.
见
docs.python.org -> write_eof
您的代码稍微修改后如下所示:
async def handle_req(self, reader, writer):
await reader.readline()
writer.write(b"\r\nbaz\r\n")
await writer.drain()
writer.write_eof()
writer.close()
await writer.wait_closed()
通常您不会在每次请求后关闭套接字。
以下示例仅供说明,旨在说明不需要关闭套接字。当然,你总是会读取一行,然后根据 Redis 协议解释数据。我们在这里知道发送了两个 GET 命令(每 5 行,具有 2 个元素的数组指示符,字符串指示符,字符串值 'GET' 以及字符串指示符和相应的值,即键)
async def handle_req(self, reader, writer):
print("start")
for i in range(0, 2):
for x in range(0, 5):
print(await reader.readline())
writer.write(b"\r\nbaz\r\n")
await writer.drain()
writer.write_eof()
writer.close()
await writer.wait_closed()
客户端发送是这样完成的:
print(r.get("foo"))
print(r.get("bar"))
time.sleep(1)
最后一个time.sleep是保证客户端不会马上退出
控制台上的输出是:
start
b'*2\r\n'
b'\r\n'
b'GET\r\n'
b'\r\n'
b'foo\r\n'
b'*2\r\n'
b'\r\n'
b'GET\r\n'
b'\r\n'
b'bar\r\n'
注意start
只输出一次,说明我们可以处理多个请求而不用马上关闭socket
我正在尝试使用 asyncio 和 Unix 域套接字在 Python 中构建玩具内存 Redis 服务器。
我的最小示例只是 returns 每个请求的值 baz
:
import asyncio
class RedisServer:
def __init__(self):
self.server_address = "/tmp/redis.sock"
async def handle_req(self, reader, writer):
await reader.readline()
writer.write(b"\r\nbaz\r\n")
await writer.drain()
writer.close()
await writer.wait_closed()
async def main(self):
server = await asyncio.start_unix_server(self.handle_req, self.server_address)
async with server:
await server.serve_forever()
def run(self):
asyncio.run(self.main())
RedisServer().run()
当我用 the redis
client library 使用以下脚本测试两个连续的客户端请求时,它有效:
import time
import redis
r = redis.Redis(unix_socket_path="/tmp/redis.sock")
r.get("foo")
time.sleep(1)
r.get("bar")
但是,如果我删除 time.sleep(1)
,有时它会起作用,有时第二个请求会失败:
Traceback (most recent call last):
File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 706, in send_packed_command
sendall(self._sock, item)
File "/tmp/env/lib/python3.8/site-packages/redis/_compat.py", line 9, in sendall
return sock.sendall(*args, **kwargs)
BrokenPipeError: [Errno 32] Broken pipe
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "test.py", line 9, in <module>
r.get("bar")
File "/tmp/env/lib/python3.8/site-packages/redis/client.py", line 1606, in get
return self.execute_command('GET', name)
File "/tmp/env/lib/python3.8/site-packages/redis/client.py", line 900, in execute_command
conn.send_command(*args)
File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 725, in send_command
self.send_packed_command(self.pack_command(*args),
File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 717, in send_packed_command
raise ConnectionError("Error %s while writing to socket. %s." %
redis.exceptions.ConnectionError: Error 32 while writing to socket. Broken pipe.
或:
Traceback (most recent call last):
File "test.py", line 9, in <module>
r.get("bar")
File "/tmp/env/lib/python3.8/site-packages/redis/client.py", line 1606, in get
return self.execute_command('GET', name)
File "/tmp/env/lib/python3.8/site-packages/redis/client.py", line 901, in execute_command
return self.parse_response(conn, command_name, **options)
File "/tmp/env/lib/python3.8/site-packages/redis/client.py", line 915, in parse_response
response = connection.read_response()
File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 739, in read_response
response = self._parser.read_response()
File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 324, in read_response
raw = self._buffer.readline()
File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 256, in readline
self._read_from_socket()
File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 201, in _read_from_socket
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
redis.exceptions.ConnectionError: Connection closed by server.
我的实现似乎缺少客户端库期望的一些关键行为(可能是因为它是异步的)。我错过了什么?
如果你想在每次请求后关闭套接字,你需要使用 write_eof()
,
Close the write end of the stream after the buffered write data is flushed.
见 docs.python.org -> write_eof
您的代码稍微修改后如下所示:
async def handle_req(self, reader, writer):
await reader.readline()
writer.write(b"\r\nbaz\r\n")
await writer.drain()
writer.write_eof()
writer.close()
await writer.wait_closed()
通常您不会在每次请求后关闭套接字。
以下示例仅供说明,旨在说明不需要关闭套接字。当然,你总是会读取一行,然后根据 Redis 协议解释数据。我们在这里知道发送了两个 GET 命令(每 5 行,具有 2 个元素的数组指示符,字符串指示符,字符串值 'GET' 以及字符串指示符和相应的值,即键)
async def handle_req(self, reader, writer):
print("start")
for i in range(0, 2):
for x in range(0, 5):
print(await reader.readline())
writer.write(b"\r\nbaz\r\n")
await writer.drain()
writer.write_eof()
writer.close()
await writer.wait_closed()
客户端发送是这样完成的:
print(r.get("foo"))
print(r.get("bar"))
time.sleep(1)
最后一个time.sleep是保证客户端不会马上退出
控制台上的输出是:
start
b'*2\r\n'
b'\r\n'
b'GET\r\n'
b'\r\n'
b'foo\r\n'
b'*2\r\n'
b'\r\n'
b'GET\r\n'
b'\r\n'
b'bar\r\n'
注意start
只输出一次,说明我们可以处理多个请求而不用马上关闭socket