如何在 python-trio 中的 KeyboardInterrupt 后清理连接
How to clean up connections after KeyboardInterrupt in python-trio
我的 class 当连接到服务器时应该立即发送 sign in 字符串,然后当会话结束时它应该发送 退出 字符串并清理套接字。下面是我的代码。
import trio
class test:
_buffer = 8192
_max_retry = 4
def __init__(self, host='127.0.0.1', port=12345, usr='user', pwd='secret'):
self.host = str(host)
self.port = int(port)
self.usr = str(usr)
self.pwd = str(pwd)
self._nl = b'\r\n'
self._attempt = 0
self._queue = trio.Queue(30)
self._connected = trio.Event()
self._end_session = trio.Event()
@property
def connected(self):
return self._connected.is_set()
async def _sender(self, client_stream, nursery):
print('## sender: started!')
q = self._queue
while True:
cmd = await q.get()
print('## sending to the server:\n{!r}\n'.format(cmd))
if self._end_session.is_set():
nursery.cancel_scope.shield = True
with trio.move_on_after(1):
await client_stream.send_all(cmd)
nursery.cancel_scope.shield = False
await client_stream.send_all(cmd)
async def _receiver(self, client_stream, nursery):
print('## receiver: started!')
buff = self._buffer
while True:
data = await client_stream.receive_some(buff)
if not data:
print('## receiver: connection closed')
self._end_session.set()
break
print('## got data from the server:\n{!r}'.format(data))
async def _watchdog(self, nursery):
await self._end_session.wait()
await self._queue.put(self._logoff)
self._connected.clear()
nursery.cancel_scope.cancel()
@property
def _login(self, *a, **kw):
nl = self._nl
usr, pwd = self.usr, self.pwd
return nl.join(x.encode() for x in ['Login', usr,pwd]) + 2*nl
@property
def _logoff(self, *a, **kw):
nl = self._nl
return nl.join(x.encode() for x in ['Logoff']) + 2*nl
async def _connect(self):
host, port = self.host, self.port
print('## connecting to {}:{}'.format(host, port))
try:
client_stream = await trio.open_tcp_stream(host, port)
except OSError as err:
print('##', err)
else:
async with client_stream:
self._end_session.clear()
self._connected.set()
self._attempt = 0
# Sign in as soon as connected
await self._queue.put(self._login)
async with trio.open_nursery() as nursery:
print("## spawning watchdog...")
nursery.start_soon(self._watchdog, nursery)
print("## spawning sender...")
nursery.start_soon(self._sender, client_stream, nursery)
print("## spawning receiver...")
nursery.start_soon(self._receiver, client_stream, nursery)
def connect(self):
while self._attempt <= self._max_retry:
try:
trio.run(self._connect)
trio.run(trio.sleep, 1)
self._attempt += 1
except KeyboardInterrupt:
self._end_session.set()
print('Bye bye...')
break
tst = test()
tst.connect()
我的逻辑不太对。好吧,如果我杀死 netcat
侦听器,它会起作用,所以我的会话如下所示:
## connecting to 127.0.0.1:12345
## spawning watchdog...
## spawning sender...
## spawning receiver...
## receiver: started!
## sender: started!
## sending to the server:
b'Login\r\nuser\r\nsecret\r\n\r\n'
## receiver: connection closed
## sending to the server:
b'Logoff\r\n\r\n'
请注意,Logoff
字符串已发送出去,但在此处没有意义,因为那时连接已经断开。
然而我的目标是 Logoff
当用户 KeyboardInterrupt
。在这种情况下,我的会话看起来类似于:
## connecting to 127.0.0.1:12345
## spawning watchdog...
## spawning sender...
## spawning receiver...
## receiver: started!
## sender: started!
## sending to the server:
b'Login\r\nuser\r\nsecret\r\n\r\n'
Bye bye...
请注意,Logoff
尚未发送。
有什么想法吗?
这里你的调用树看起来像这样:
connect
|
+- _connect*
|
+- _watchdog*
|
+- _sender*
|
+- _receiver*
*
表示 4 个三重任务。 _connect
任务位于 nursery 块的末尾,等待子任务完成。 _watchdog
任务在await self._end_session.wait()
被阻塞,_sender
任务在await q.get()
被阻塞,_receiver
任务在await client_stream.receive_some(...)
被阻塞。
当您按下 control-C 时,标准的 Python 语义是 Python 代码的任何位 运行 突然引发 KeyboardInterrupt
。在这种情况下,您有 4 个不同的任务 运行,因此这些被阻止的操作之一会被随机选择 [1],并引发 KeyboardInterrupt
。这意味着可能会发生一些不同的事情:
如果 _watchdog
的 wait
调用引发 KeyboardInterrupt
,那么 _watchdog
方法会立即退出,因此它甚至不会尝试发送 logout
。然后作为展开堆栈的一部分,trio 取消所有其他任务,一旦它们退出,KeyboardInterrupt
就会继续向上传播,直到它到达 connect
中的 finally
块。此时你尝试使用 self._end_session.set()
通知看门狗任务,但它不再是 运行,所以它没有注意到。
如果_sender
的q.get()
调用引发KeyboardInterrupt
,那么_sender
方法立即退出,所以即使_watchdog
确实要求它发送注销消息,它不会在那里注意到。在任何情况下,trio 都会继续取消看门狗和接收器任务,然后事情会像上面那样进行。
如果 _receiver
的 receive_all
调用加注 KeyboardInterrupt
...同样的事情发生。
细微之处:_connect
也可以接收 KeyboardInterrupt
,它做同样的事情:取消所有子项,然后等待它们停止,然后再允许 KeyboardInterrupt
继续传播。
如果你想可靠地捕获 control-C 然后用它做一些事情,那么它在某个随机点被引发的事情是相当麻烦的。最简单的方法是使用 Trio's support for catching signals 捕捉 signal.SIGINT
信号,这是 Python 通常转换为 KeyboardInterrupt
的信号。 ("INT" 代表 "interrupt"。)类似于:
async def _control_c_watcher(self):
# This API is currently a little cumbersome, sorry, see
# https://github.com/python-trio/trio/issues/354
with trio.catch_signals({signal.SIGINT}) as batched_signal_aiter:
async for _ in batched_signal_aiter:
self._end_session.set()
# We exit the loop, restoring the normal behavior of
# control-C. This way hitting control-C once will try to
# do a polite shutdown, but if that gets stuck the user
# can hit control-C again to raise KeyboardInterrupt and
# force things to exit.
break
然后开始此 运行 以及您的其他任务。
您还遇到了问题,在您的 _watchdog
方法中,它将 logoff
请求放入队列中——因此安排了稍后由 _sender
任务发送的消息– 然后立即取消所有任务,这样 _sender
任务可能没有机会看到消息并对其做出反应!总的来说,我发现当我只在必要时使用任务时,我的代码工作得更好。与其有一个发送者任务,然后在你想发送消息时将消息放入队列,为什么不让想要发送消息的代码直接调用 stream.send_all
?你必须注意的一件事是,如果你有多个任务可能同时发送东西,你可能想使用 trio.Lock()
来确保它们不会通过调用 send_all
相互碰撞同时:
async def send_all(self, data):
async with self.send_lock:
await self.send_stream.send_all(data)
async def do_logoff(self):
# First send the message
await self.send_all(b"Logoff\r\n\r\n")
# And then, *after* the message has been sent, cancel the tasks
self.nursery.cancel()
如果你这样做,你可能能够完全摆脱看门狗任务和 _end_session
事件。
我在这里时关于您的代码的一些其他说明:
像这样多次调用 trio.run
很不寻常。正常的风格是在你的程序的顶部调用它一次,并将你所有的真实代码放在里面。一旦你退出 trio.run
,trio 的所有状态都将丢失,你绝对不会 运行 任何并发任务(所以不可能 可能 正在监听和请注意您对 _end_session.set()
的调用!)。通常,几乎所有 Trio 函数都假设您已经在调用 trio.run
中。事实证明,现在你可以在开始 trio 之前调用 trio.Queue()
而不会出现异常,但这基本上只是巧合。
在 _sender
内部使用屏蔽对我来说很奇怪。屏蔽通常是您几乎从不想要使用的高级功能,我认为这也不例外。
希望对您有所帮助!如果您想更多地讨论 style/design 此类问题,但又担心它们可能过于模糊导致堆栈溢出 ("is this program designed well?"),请随时访问 trio chat channel.
[1] 好吧,实际上 trio 可能出于各种原因选择了主要任务,但这并不能保证,无论如何这对这里没有影响。
我的 class 当连接到服务器时应该立即发送 sign in 字符串,然后当会话结束时它应该发送 退出 字符串并清理套接字。下面是我的代码。
import trio
class test:
_buffer = 8192
_max_retry = 4
def __init__(self, host='127.0.0.1', port=12345, usr='user', pwd='secret'):
self.host = str(host)
self.port = int(port)
self.usr = str(usr)
self.pwd = str(pwd)
self._nl = b'\r\n'
self._attempt = 0
self._queue = trio.Queue(30)
self._connected = trio.Event()
self._end_session = trio.Event()
@property
def connected(self):
return self._connected.is_set()
async def _sender(self, client_stream, nursery):
print('## sender: started!')
q = self._queue
while True:
cmd = await q.get()
print('## sending to the server:\n{!r}\n'.format(cmd))
if self._end_session.is_set():
nursery.cancel_scope.shield = True
with trio.move_on_after(1):
await client_stream.send_all(cmd)
nursery.cancel_scope.shield = False
await client_stream.send_all(cmd)
async def _receiver(self, client_stream, nursery):
print('## receiver: started!')
buff = self._buffer
while True:
data = await client_stream.receive_some(buff)
if not data:
print('## receiver: connection closed')
self._end_session.set()
break
print('## got data from the server:\n{!r}'.format(data))
async def _watchdog(self, nursery):
await self._end_session.wait()
await self._queue.put(self._logoff)
self._connected.clear()
nursery.cancel_scope.cancel()
@property
def _login(self, *a, **kw):
nl = self._nl
usr, pwd = self.usr, self.pwd
return nl.join(x.encode() for x in ['Login', usr,pwd]) + 2*nl
@property
def _logoff(self, *a, **kw):
nl = self._nl
return nl.join(x.encode() for x in ['Logoff']) + 2*nl
async def _connect(self):
host, port = self.host, self.port
print('## connecting to {}:{}'.format(host, port))
try:
client_stream = await trio.open_tcp_stream(host, port)
except OSError as err:
print('##', err)
else:
async with client_stream:
self._end_session.clear()
self._connected.set()
self._attempt = 0
# Sign in as soon as connected
await self._queue.put(self._login)
async with trio.open_nursery() as nursery:
print("## spawning watchdog...")
nursery.start_soon(self._watchdog, nursery)
print("## spawning sender...")
nursery.start_soon(self._sender, client_stream, nursery)
print("## spawning receiver...")
nursery.start_soon(self._receiver, client_stream, nursery)
def connect(self):
while self._attempt <= self._max_retry:
try:
trio.run(self._connect)
trio.run(trio.sleep, 1)
self._attempt += 1
except KeyboardInterrupt:
self._end_session.set()
print('Bye bye...')
break
tst = test()
tst.connect()
我的逻辑不太对。好吧,如果我杀死 netcat
侦听器,它会起作用,所以我的会话如下所示:
## connecting to 127.0.0.1:12345
## spawning watchdog...
## spawning sender...
## spawning receiver...
## receiver: started!
## sender: started!
## sending to the server:
b'Login\r\nuser\r\nsecret\r\n\r\n'
## receiver: connection closed
## sending to the server:
b'Logoff\r\n\r\n'
请注意,Logoff
字符串已发送出去,但在此处没有意义,因为那时连接已经断开。
然而我的目标是 Logoff
当用户 KeyboardInterrupt
。在这种情况下,我的会话看起来类似于:
## connecting to 127.0.0.1:12345
## spawning watchdog...
## spawning sender...
## spawning receiver...
## receiver: started!
## sender: started!
## sending to the server:
b'Login\r\nuser\r\nsecret\r\n\r\n'
Bye bye...
请注意,Logoff
尚未发送。
有什么想法吗?
这里你的调用树看起来像这样:
connect
|
+- _connect*
|
+- _watchdog*
|
+- _sender*
|
+- _receiver*
*
表示 4 个三重任务。 _connect
任务位于 nursery 块的末尾,等待子任务完成。 _watchdog
任务在await self._end_session.wait()
被阻塞,_sender
任务在await q.get()
被阻塞,_receiver
任务在await client_stream.receive_some(...)
被阻塞。
当您按下 control-C 时,标准的 Python 语义是 Python 代码的任何位 运行 突然引发 KeyboardInterrupt
。在这种情况下,您有 4 个不同的任务 运行,因此这些被阻止的操作之一会被随机选择 [1],并引发 KeyboardInterrupt
。这意味着可能会发生一些不同的事情:
如果
_watchdog
的wait
调用引发KeyboardInterrupt
,那么_watchdog
方法会立即退出,因此它甚至不会尝试发送logout
。然后作为展开堆栈的一部分,trio 取消所有其他任务,一旦它们退出,KeyboardInterrupt
就会继续向上传播,直到它到达connect
中的finally
块。此时你尝试使用self._end_session.set()
通知看门狗任务,但它不再是 运行,所以它没有注意到。如果
_sender
的q.get()
调用引发KeyboardInterrupt
,那么_sender
方法立即退出,所以即使_watchdog
确实要求它发送注销消息,它不会在那里注意到。在任何情况下,trio 都会继续取消看门狗和接收器任务,然后事情会像上面那样进行。如果
_receiver
的receive_all
调用加注KeyboardInterrupt
...同样的事情发生。细微之处:
_connect
也可以接收KeyboardInterrupt
,它做同样的事情:取消所有子项,然后等待它们停止,然后再允许KeyboardInterrupt
继续传播。
如果你想可靠地捕获 control-C 然后用它做一些事情,那么它在某个随机点被引发的事情是相当麻烦的。最简单的方法是使用 Trio's support for catching signals 捕捉 signal.SIGINT
信号,这是 Python 通常转换为 KeyboardInterrupt
的信号。 ("INT" 代表 "interrupt"。)类似于:
async def _control_c_watcher(self):
# This API is currently a little cumbersome, sorry, see
# https://github.com/python-trio/trio/issues/354
with trio.catch_signals({signal.SIGINT}) as batched_signal_aiter:
async for _ in batched_signal_aiter:
self._end_session.set()
# We exit the loop, restoring the normal behavior of
# control-C. This way hitting control-C once will try to
# do a polite shutdown, but if that gets stuck the user
# can hit control-C again to raise KeyboardInterrupt and
# force things to exit.
break
然后开始此 运行 以及您的其他任务。
您还遇到了问题,在您的 _watchdog
方法中,它将 logoff
请求放入队列中——因此安排了稍后由 _sender
任务发送的消息– 然后立即取消所有任务,这样 _sender
任务可能没有机会看到消息并对其做出反应!总的来说,我发现当我只在必要时使用任务时,我的代码工作得更好。与其有一个发送者任务,然后在你想发送消息时将消息放入队列,为什么不让想要发送消息的代码直接调用 stream.send_all
?你必须注意的一件事是,如果你有多个任务可能同时发送东西,你可能想使用 trio.Lock()
来确保它们不会通过调用 send_all
相互碰撞同时:
async def send_all(self, data):
async with self.send_lock:
await self.send_stream.send_all(data)
async def do_logoff(self):
# First send the message
await self.send_all(b"Logoff\r\n\r\n")
# And then, *after* the message has been sent, cancel the tasks
self.nursery.cancel()
如果你这样做,你可能能够完全摆脱看门狗任务和 _end_session
事件。
我在这里时关于您的代码的一些其他说明:
像这样多次调用
trio.run
很不寻常。正常的风格是在你的程序的顶部调用它一次,并将你所有的真实代码放在里面。一旦你退出trio.run
,trio 的所有状态都将丢失,你绝对不会 运行 任何并发任务(所以不可能 可能 正在监听和请注意您对_end_session.set()
的调用!)。通常,几乎所有 Trio 函数都假设您已经在调用trio.run
中。事实证明,现在你可以在开始 trio 之前调用trio.Queue()
而不会出现异常,但这基本上只是巧合。在
_sender
内部使用屏蔽对我来说很奇怪。屏蔽通常是您几乎从不想要使用的高级功能,我认为这也不例外。
希望对您有所帮助!如果您想更多地讨论 style/design 此类问题,但又担心它们可能过于模糊导致堆栈溢出 ("is this program designed well?"),请随时访问 trio chat channel.
[1] 好吧,实际上 trio 可能出于各种原因选择了主要任务,但这并不能保证,无论如何这对这里没有影响。