如果事件循环已经 运行,如何等待协程在方法内同步完成?
How to wait for coroutines to complete synchronously within method if event loop is already running?
我正在尝试创建一个基于 Python 的 CLI,它通过 websockets 与 web 服务通信。我遇到的一个问题是 CLI 向 Web 服务发出的请求间歇性地无法得到处理。查看来自 Web 服务的日志,我可以看出问题是由以下事实引起的:这些请求经常在套接字关闭的同时(甚至之后)发出:
2016-09-13 13:28:10,930 [22 ] INFO DeviceBridge - Device bridge has opened
2016-09-13 13:28:11,936 [21 ] DEBUG DeviceBridge - Device bridge has received message
2016-09-13 13:28:11,937 [21 ] DEBUG DeviceBridge - Device bridge has received valid message
2016-09-13 13:28:11,937 [21 ] WARN DeviceBridge - Unable to process request: {"value": false, "path": "testcube.pwms[0].enabled", "op": "replace"}
2016-09-13 13:28:11,936 [5 ] DEBUG DeviceBridge - Device bridge has closed
在我的 CLI 中,我定义了一个 class CommunicationService
,它负责处理与 Web 服务的所有直接通信。在内部,它使用 websockets
包来处理通信,它本身是建立在 asyncio
.
之上的
CommunicationService
包含以下发送请求的方法:
def send_request(self, request: str) -> None:
logger.debug('Sending request: {}'.format(request))
asyncio.ensure_future(self._ws.send(request))
...其中 ws
是之前在另一种方法中打开的 websocket:
self._ws = await websockets.connect(websocket_address)
我想要的是能够等待 return 由 asyncio.ensure_future
编辑的未来,并且如果有必要,在之后休眠一会儿,以便让 Web 服务有时间处理在 websocket 关闭之前请求。
但是,由于send_request
是一种同步方法,它不能简单地await
这些期货。让它异步将毫无意义,因为没有什么可以等待它 returned 的协程对象。我也不能使用 loop.run_until_complete
,因为循环在调用时已经 运行。
我发现有人描述的问题与我在 mail.python.org 遇到的问题非常相似。在该线程中发布的解决方案是在循环已经 运行:
的情况下使函数 return 成为协程对象
def aio_map(coro, iterable, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
coroutines = map(coro, iterable)
coros = asyncio.gather(*coroutines, return_exceptions=True, loop=loop)
if loop.is_running():
return coros
else:
return loop.run_until_complete(coros)
这对我来说是不可能的,因为我正在使用 PyRx(Python 反应框架的实现)并且 send_request
仅作为 Rx observable 的订阅者调用,这意味着return 值被丢弃并且不适用于我的代码:
class AnonymousObserver(ObserverBase):
...
def _on_next_core(self, value):
self._next(value)
顺便说一句,我不确定这是否是 asyncio
经常遇到的某种问题,或者我只是不明白,但我发现它非常令人沮丧使用。在 C# 中(例如),我需要做的可能是像下面这样的事情:
void SendRequest(string request)
{
this.ws.Send(request).Wait();
// Task.Delay(500).Wait(); // Uncomment If necessary
}
与此同时,asyncio
的 "wait" 版本无济于事,只是 return 另一个我被迫丢弃的协程。
更新
我找到了解决这个问题的方法,而且似乎有效。我有一个在命令执行之后和 CLI 终止之前执行的异步回调,所以我只是将它从这个...
async def after_command():
await comms.stop()
...为此:
async def after_command():
await asyncio.sleep(0.25) # Allow time for communication
await comms.stop()
不过,我仍然很乐意收到此问题的任何答案以供将来参考。在其他情况下我可能无法依赖这样的解决方法,我仍然认为在 send_request
内执行延迟会更好,这样 CommunicationService
的客户就不必担心自己有时间问题。
关于文森特的问题:
Does your loop run in a different thread, or is send_request called by some callback?
一切都在同一个线程中运行 - 它由回调调用。发生的情况是我将所有命令定义为使用异步回调,并且在执行时其中一些将尝试向 Web 服务发送请求。由于它们是异步的,因此在通过 CLI 的顶层调用 loop.run_until_complete
执行它们之前它们不会执行此操作 - 这意味着当它们执行时循环是 运行中途执行并发出此请求(通过间接调用 send_request
)。
更新 2
这是基于 Vincent 提议添加 "done" 回调的解决方案。
一个新的布尔字段 _busy
添加到 CommunicationService
以表示通信 activity 是否正在发生。
CommunicationService.send_request
被修改为在发送请求之前设置 _busy
为真,然后提供回调给 _ws.send
以在完成后重置 _busy
:
def send_request(self, request: str) -> None:
logger.debug('Sending request: {}'.format(request))
def callback(_):
self._busy = False
self._busy = True
asyncio.ensure_future(self._ws.send(request)).add_done_callback(callback)
CommunicationService.stop
现在实现为在继续之前等待此标志设置为 false:
async def stop(self) -> None:
"""
Terminate communications with TestCube Web Service.
"""
if self._listen_task is None or self._ws is None:
return
# Wait for comms activity to stop.
while self._busy:
await asyncio.sleep(0.1)
# Allow short delay after final request is processed.
await asyncio.sleep(0.1)
self._listen_task.cancel()
await asyncio.wait([self._listen_task, self._ws.close()])
self._listen_task = None
self._ws = None
logger.info('Terminated connection to TestCube Web Service')
这似乎也行得通,至少通过这种方式,所有通信时序逻辑都按应有的方式封装在 CommunicationService
class 中。
更新 3
基于 Vincent 提议的更好的解决方案。
而不是 self._busy
我们有 self._send_request_tasks = []
.
新 send_request
实现:
def send_request(self, request: str) -> None:
logger.debug('Sending request: {}'.format(request))
task = asyncio.ensure_future(self._ws.send(request))
self._send_request_tasks.append(task)
新 stop
实现:
async def stop(self) -> None:
if self._listen_task is None or self._ws is None:
return
# Wait for comms activity to stop.
if self._send_request_tasks:
await asyncio.wait(self._send_request_tasks)
...
您可以使用 set
个任务:
self._send_request_tasks = set()
使用 ensure_future
安排任务并使用 add_done_callback
清理:
def send_request(self, request: str) -> None:
task = asyncio.ensure_future(self._ws.send(request))
self._send_request_tasks.add(task)
task.add_done_callback(self._send_request_tasks.remove)
并等待 set
个任务完成:
async def stop(self):
if self._send_request_tasks:
await asyncio.wait(self._send_request_tasks)
鉴于您不在异步函数中,您可以使用 yield from
关键字来有效地自己实现 await
。以下代码将阻塞直到将来 returns:
def send_request(self, request: str) -> None:
logger.debug('Sending request: {}'.format(request))
future = asyncio.ensure_future(self._ws.send(request))
yield from future.__await__()
我正在尝试创建一个基于 Python 的 CLI,它通过 websockets 与 web 服务通信。我遇到的一个问题是 CLI 向 Web 服务发出的请求间歇性地无法得到处理。查看来自 Web 服务的日志,我可以看出问题是由以下事实引起的:这些请求经常在套接字关闭的同时(甚至之后)发出:
2016-09-13 13:28:10,930 [22 ] INFO DeviceBridge - Device bridge has opened
2016-09-13 13:28:11,936 [21 ] DEBUG DeviceBridge - Device bridge has received message
2016-09-13 13:28:11,937 [21 ] DEBUG DeviceBridge - Device bridge has received valid message
2016-09-13 13:28:11,937 [21 ] WARN DeviceBridge - Unable to process request: {"value": false, "path": "testcube.pwms[0].enabled", "op": "replace"}
2016-09-13 13:28:11,936 [5 ] DEBUG DeviceBridge - Device bridge has closed
在我的 CLI 中,我定义了一个 class CommunicationService
,它负责处理与 Web 服务的所有直接通信。在内部,它使用 websockets
包来处理通信,它本身是建立在 asyncio
.
CommunicationService
包含以下发送请求的方法:
def send_request(self, request: str) -> None:
logger.debug('Sending request: {}'.format(request))
asyncio.ensure_future(self._ws.send(request))
...其中 ws
是之前在另一种方法中打开的 websocket:
self._ws = await websockets.connect(websocket_address)
我想要的是能够等待 return 由 asyncio.ensure_future
编辑的未来,并且如果有必要,在之后休眠一会儿,以便让 Web 服务有时间处理在 websocket 关闭之前请求。
但是,由于send_request
是一种同步方法,它不能简单地await
这些期货。让它异步将毫无意义,因为没有什么可以等待它 returned 的协程对象。我也不能使用 loop.run_until_complete
,因为循环在调用时已经 运行。
我发现有人描述的问题与我在 mail.python.org 遇到的问题非常相似。在该线程中发布的解决方案是在循环已经 运行:
的情况下使函数 return 成为协程对象def aio_map(coro, iterable, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
coroutines = map(coro, iterable)
coros = asyncio.gather(*coroutines, return_exceptions=True, loop=loop)
if loop.is_running():
return coros
else:
return loop.run_until_complete(coros)
这对我来说是不可能的,因为我正在使用 PyRx(Python 反应框架的实现)并且 send_request
仅作为 Rx observable 的订阅者调用,这意味着return 值被丢弃并且不适用于我的代码:
class AnonymousObserver(ObserverBase):
...
def _on_next_core(self, value):
self._next(value)
顺便说一句,我不确定这是否是 asyncio
经常遇到的某种问题,或者我只是不明白,但我发现它非常令人沮丧使用。在 C# 中(例如),我需要做的可能是像下面这样的事情:
void SendRequest(string request)
{
this.ws.Send(request).Wait();
// Task.Delay(500).Wait(); // Uncomment If necessary
}
与此同时,asyncio
的 "wait" 版本无济于事,只是 return 另一个我被迫丢弃的协程。
更新
我找到了解决这个问题的方法,而且似乎有效。我有一个在命令执行之后和 CLI 终止之前执行的异步回调,所以我只是将它从这个...
async def after_command():
await comms.stop()
...为此:
async def after_command():
await asyncio.sleep(0.25) # Allow time for communication
await comms.stop()
不过,我仍然很乐意收到此问题的任何答案以供将来参考。在其他情况下我可能无法依赖这样的解决方法,我仍然认为在 send_request
内执行延迟会更好,这样 CommunicationService
的客户就不必担心自己有时间问题。
关于文森特的问题:
Does your loop run in a different thread, or is send_request called by some callback?
一切都在同一个线程中运行 - 它由回调调用。发生的情况是我将所有命令定义为使用异步回调,并且在执行时其中一些将尝试向 Web 服务发送请求。由于它们是异步的,因此在通过 CLI 的顶层调用 loop.run_until_complete
执行它们之前它们不会执行此操作 - 这意味着当它们执行时循环是 运行中途执行并发出此请求(通过间接调用 send_request
)。
更新 2
这是基于 Vincent 提议添加 "done" 回调的解决方案。
一个新的布尔字段 _busy
添加到 CommunicationService
以表示通信 activity 是否正在发生。
CommunicationService.send_request
被修改为在发送请求之前设置 _busy
为真,然后提供回调给 _ws.send
以在完成后重置 _busy
:
def send_request(self, request: str) -> None:
logger.debug('Sending request: {}'.format(request))
def callback(_):
self._busy = False
self._busy = True
asyncio.ensure_future(self._ws.send(request)).add_done_callback(callback)
CommunicationService.stop
现在实现为在继续之前等待此标志设置为 false:
async def stop(self) -> None:
"""
Terminate communications with TestCube Web Service.
"""
if self._listen_task is None or self._ws is None:
return
# Wait for comms activity to stop.
while self._busy:
await asyncio.sleep(0.1)
# Allow short delay after final request is processed.
await asyncio.sleep(0.1)
self._listen_task.cancel()
await asyncio.wait([self._listen_task, self._ws.close()])
self._listen_task = None
self._ws = None
logger.info('Terminated connection to TestCube Web Service')
这似乎也行得通,至少通过这种方式,所有通信时序逻辑都按应有的方式封装在 CommunicationService
class 中。
更新 3
基于 Vincent 提议的更好的解决方案。
而不是 self._busy
我们有 self._send_request_tasks = []
.
新 send_request
实现:
def send_request(self, request: str) -> None:
logger.debug('Sending request: {}'.format(request))
task = asyncio.ensure_future(self._ws.send(request))
self._send_request_tasks.append(task)
新 stop
实现:
async def stop(self) -> None:
if self._listen_task is None or self._ws is None:
return
# Wait for comms activity to stop.
if self._send_request_tasks:
await asyncio.wait(self._send_request_tasks)
...
您可以使用 set
个任务:
self._send_request_tasks = set()
使用 ensure_future
安排任务并使用 add_done_callback
清理:
def send_request(self, request: str) -> None:
task = asyncio.ensure_future(self._ws.send(request))
self._send_request_tasks.add(task)
task.add_done_callback(self._send_request_tasks.remove)
并等待 set
个任务完成:
async def stop(self):
if self._send_request_tasks:
await asyncio.wait(self._send_request_tasks)
鉴于您不在异步函数中,您可以使用 yield from
关键字来有效地自己实现 await
。以下代码将阻塞直到将来 returns:
def send_request(self, request: str) -> None:
logger.debug('Sending request: {}'.format(request))
future = asyncio.ensure_future(self._ws.send(request))
yield from future.__await__()