如果事件循环已经 运行,如何等待协程在方法内同步完成?

How to wait for coroutines to complete synchronously within method if event loop is already running?

我正在尝试创建一个基于 Python 的 CLI,它通过 websockets 与 web 服务通信。我遇到的一个问题是 CLI 向 Web 服务发出的请求间歇性地无法得到处理。查看来自 Web 服务的日志,我可以看出问题是由以下事实引起的:这些请求经常在套接字关闭的同时(甚至之后)发出:

2016-09-13 13:28:10,930 [22 ] INFO  DeviceBridge - Device bridge has opened
2016-09-13 13:28:11,936 [21 ] DEBUG DeviceBridge - Device bridge has received message
2016-09-13 13:28:11,937 [21 ] DEBUG DeviceBridge - Device bridge has received valid message
2016-09-13 13:28:11,937 [21 ] WARN  DeviceBridge - Unable to process request: {"value": false, "path": "testcube.pwms[0].enabled", "op": "replace"}
2016-09-13 13:28:11,936 [5  ] DEBUG DeviceBridge - Device bridge has closed

在我的 CLI 中,我定义了一个 class CommunicationService,它负责处理与 Web 服务的所有直接通信。在内部,它使用 websockets 包来处理通信,它本身是建立在 asyncio.

之上的

CommunicationService包含以下发送请求的方法:

def send_request(self, request: str) -> None:
    logger.debug('Sending request: {}'.format(request))
    asyncio.ensure_future(self._ws.send(request))

...其中 ws 是之前在另一种方法中打开的 websocket:

self._ws = await websockets.connect(websocket_address)

我想要的是能够等待 return 由 asyncio.ensure_future 编辑的未来,并且如果有必要,在之后休眠一会儿,以便让 Web 服务有时间处理在 websocket 关闭之前请求。

但是,由于send_request是一种同步方法,它不能简单地await这些期货。让它异步将毫无意义,因为没有什么可以等待它 returned 的协程对象。我也不能使用 loop.run_until_complete,因为循环在调用时已经 运行。

我发现有人描述的问题与我在 mail.python.org 遇到的问题非常相似。在该线程中发布的解决方案是在循环已经 运行:

的情况下使函数 return 成为协程对象
def aio_map(coro, iterable, loop=None):
    if loop is None:
        loop = asyncio.get_event_loop()

    coroutines = map(coro, iterable)
    coros = asyncio.gather(*coroutines, return_exceptions=True, loop=loop)

    if loop.is_running():
        return coros
    else:
        return loop.run_until_complete(coros)

这对我来说是不可能的,因为我正在使用 PyRx(Python 反应框架的实现)并且 send_request 仅作为 Rx observable 的订阅者调用,这意味着return 值被丢弃并且不适用于我的代码:

class AnonymousObserver(ObserverBase):
    ...
    def _on_next_core(self, value):
        self._next(value)

顺便说一句,我不确定这是否是 asyncio 经常遇到的某种问题,或者我只是不明白,但我发现它非常令人沮丧使用。在 C# 中(例如),我需要做的可能是像下面这样的事情:

void SendRequest(string request)
{
    this.ws.Send(request).Wait();
    // Task.Delay(500).Wait();  // Uncomment If necessary
}

与此同时,asyncio 的 "wait" 版本无济于事,只是 return 另一个我被迫丢弃的协程。

更新

我找到了解决这个问题的方法,而且似乎有效。我有一个在命令执行之后和 CLI 终止之前执行的异步回调,所以我只是将它从这个...

async def after_command():
    await comms.stop()

...为此:

async def after_command():
    await asyncio.sleep(0.25)  # Allow time for communication
    await comms.stop()

不过,我仍然很乐意收到此问题的任何答案以供将来参考。在其他情况下我可能无法依赖这样的解决方法,我仍然认为在 send_request 内执行延迟会更好,这样 CommunicationService 的客户就不必担心自己有时间问题。

关于文森特的问题:

Does your loop run in a different thread, or is send_request called by some callback?

一切都在同一个线程中运行 - 它由回调调用。发生的情况是我将所有命令定义为使用异步回调,并且在执行时其中一些将尝试向 Web 服务发送请求。由于它们是异步的,因此在通过 CLI 的顶层调用 loop.run_until_complete 执行它们之前它们不会执行此操作 - 这意味着当它们执行时循环是 运行中途执行并发出此请求(通过间接调用 send_request)。

更新 2

这是基于 Vincent 提议添加 "done" 回调的解决方案。

一个新的布尔字段 _busy 添加到 CommunicationService 以表示通信 activity 是否正在发生。

CommunicationService.send_request 被修改为在发送请求之前设置 _busy 为真,然后提供回调给 _ws.send 以在完成后重置 _busy

def send_request(self, request: str) -> None:
    logger.debug('Sending request: {}'.format(request))

    def callback(_):
        self._busy = False

    self._busy = True
    asyncio.ensure_future(self._ws.send(request)).add_done_callback(callback)

CommunicationService.stop 现在实现为在继续之前等待此标志设置为 false:

async def stop(self) -> None:
    """
    Terminate communications with TestCube Web Service.
    """
    if self._listen_task is None or self._ws is None:
        return

    # Wait for comms activity to stop.
    while self._busy:
        await asyncio.sleep(0.1)

    # Allow short delay after final request is processed.
    await asyncio.sleep(0.1)

    self._listen_task.cancel()
    await asyncio.wait([self._listen_task, self._ws.close()])

    self._listen_task = None
    self._ws = None
    logger.info('Terminated connection to TestCube Web Service')

这似乎也行得通,至少通过这种方式,所有通信时序逻辑都按应有的方式封装在 CommunicationService class 中。

更新 3

基于 Vincent 提议的更好的解决方案。

而不是 self._busy 我们有 self._send_request_tasks = [].

send_request 实现:

def send_request(self, request: str) -> None:
    logger.debug('Sending request: {}'.format(request))

    task = asyncio.ensure_future(self._ws.send(request))
    self._send_request_tasks.append(task)

stop 实现:

async def stop(self) -> None:
    if self._listen_task is None or self._ws is None:
        return

    # Wait for comms activity to stop.
    if self._send_request_tasks:
        await asyncio.wait(self._send_request_tasks)
    ...

您可以使用 set 个任务:

self._send_request_tasks = set()

使用 ensure_future 安排任务并使用 add_done_callback 清理:

def send_request(self, request: str) -> None:
    task = asyncio.ensure_future(self._ws.send(request))
    self._send_request_tasks.add(task)
    task.add_done_callback(self._send_request_tasks.remove)

并等待 set 个任务完成:

async def stop(self):
    if self._send_request_tasks:
        await asyncio.wait(self._send_request_tasks)

鉴于您不在异步函数中,您可以使用 yield from 关键字来有效地自己实现 await。以下代码将阻塞直到将来 returns:

def send_request(self, request: str) -> None:
    logger.debug('Sending request: {}'.format(request))
    future = asyncio.ensure_future(self._ws.send(request))
    yield from future.__await__()