如何使用 asyncio 在单独的线程上通知 RxPY 观察者?

How can I notify RxPY observers on separate threads using asyncio?

(注:这个问题的背景比较啰嗦,不过最下面有个SSCCE可以跳过去)

背景

我正在尝试开发一个基于 Python 的 CLI 来与 Web 服务交互。在我的代码库中,我有一个 CommunicationService class 来处理与 Web 服务的所有直接通信。它公开了一个 received_response 属性,return 是一个 Observable(来自 RxPY),其他对象可以订阅,以便在从 Web 服务收到响应时收到通知.

我的 CLI 逻辑基于 click 库,其中我的一个子命令实现如下:

async def enabled(self, request: str, response_handler: Callable[[str], Tuple[bool, str]]) -> None:
    self._generate_request(request)
    if response_handler is None:
        return None

    while True:
        response = await self.on_response
        success, value = response_handler(response)
        print(success, value)
        if success:
            return value

这里发生的事情(在 response_handler 不是 None 的情况下)是子命令作为协程运行,等待 Web 服务 (self.on_response == CommunicationService.received_response) 的响应,并且return它可以处理来自第一个响应的一些处理值。

我试图通过创建完全模拟 CommunicationService 的测试用例来测试我的 CLI 的行为;创建了一个假的 Subject(可以充当 Observable)并且 CommunicationService.received_response 被模拟为 return 它。作为测试的一部分,主题的 on_next 方法被调用以将模拟 Web 服务响应传递回生产代码:

@when('the communications service receives a response from TestCube Web Service')
def step_impl(context):
    context.mock_received_response_subject.on_next(context.text)

我使用了一个 click 'result callback' 函数,该函数在 CLI 调用结束时被调用并阻塞直到协程(子命令)完成:

@cli.resultcallback()
def _handle_command_task(task: Coroutine, **_) -> None:
    if task:
        loop = asyncio.get_event_loop()
        result = loop.run_until_complete(task)
        loop.close()
        print('RESULT:', result) 

问题

在测试开始时,我 运行 CliRunner.invoke 启动整个 shebang。问题是这是一个阻塞调用,它将阻塞线程直到 CLI 完成并 returned 一个结果,如果我需要我的测试线程继续进行它可以生成模拟 Web 服务,这将无济于事与它同时响应。

我想我需要做的是 运行 CliRunner.invoke 在使用 ThreadPoolExecutor 的新线程上。这允许测试逻辑在原始线程上继续并执行上面发布的 @when 步骤。但是,使用 mock_received_response_subject.on_next 发布的通知似乎不会触发继续执行子命令

我相信解决方案将涉及使用 RxPY 的 AsyncIOScheduler,但我发现这方面的文档有点稀疏且无用。

SSCCE

我希望下面的代码片段抓住了问题的本质。如果可以对其进行修改以使其正常工作,我应该能够将相同的解决方案应用于我的实际代码,使其按照我的意愿运行。

import asyncio
import logging
import sys
import time

import click
from click.testing import CliRunner
from rx.subjects import Subject

web_response_subject = Subject()
web_response_observable = web_response_subject.as_observable()

thread_loop = asyncio.new_event_loop()


@click.group()
def cli():
    asyncio.set_event_loop(thread_loop)


@cli.resultcallback()
def result_handler(task, **_):
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(task) # Should block until subject publishes value
    loop.close()

    print(result)


@cli.command()
async def get_web_response():
    return await web_response_observable


def test():
    runner = CliRunner()
    future = thread_loop.run_in_executor(None, runner.invoke, cli, ['get_web_response'])
    time.sleep(1)
    web_response_subject.on_next('foo') # Simulate reception of web response.
    time.sleep(1)
    result = future.result()
    print(result.output)

logging.basicConfig(
    level=logging.DEBUG,
    format='%(threadName)10s %(name)18s: %(message)s',
    stream=sys.stderr,
)

test()

当前行为

程序在 运行 时挂起,阻塞在 result = loop.run_until_complete(task)

验收标准

程序终止并在 stdout 上打印 foo

更新 1

在 Vincent 的帮助下,我对我的代码做了一些修改。

Relay.enabled(等待 Web 服务响应以处理它们的子命令)现在实现如下:

async def enabled(self, request: str, response_handler: Callable[[str], Tuple[bool, str]]) -> None:
    self._generate_request(request)

    if response_handler is None:
        return None

    return await self.on_response \
        .select(response_handler) \
        .where(lambda result, i: result[0]) \
        .select(lambda result, index: result[1]) \
        .first()

我不太确定 awaitRxPY observables 的行为方式 - 它们会 return 在每个生成的元素上执行调用者,还是仅当 observable 完成时(或错误?)。我现在知道是后者,老实说,这感觉是更自然的选择,让我感觉这个功能的实现更加优雅和反应灵敏。

我还修改了生成模拟 Web 服务响应的测试步骤:

@when('the communications service receives a response from TestCube Web Service')
def step_impl(context):
    loop = asyncio.get_event_loop()
    loop.call_soon_threadsafe(context.mock_received_response_subject.on_next, context.text)

不幸的是,这将无法正常工作,因为 CLI 正在其自己的线程中调用...

@when('the CLI is run with "{arguments}"')
def step_impl(context, arguments):
    loop = asyncio.get_event_loop()
    if 'async.cli' in context.tags:
        context.async_result = loop.run_in_executor(None, context.cli_runner.invoke, testcube.cli, arguments.split())
    else:
        ...

并且 CLI 在调用时会创建自己的线程专用事件循环...

def cli(context, hostname, port):
    _initialize_logging(context.meta['click_log.core.logger']['level'])

    # Create a new event loop for processing commands asynchronously on.
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    ...

我认为我需要的是一种允许我的测试步骤在新线程上调用 CLI 然后获取它正在使用的事件循环的方法:

@when('the communications service receives a response from TestCube Web Service')
def step_impl(context):
    loop = _get_cli_event_loop() # Needs to be implemented.
    loop.call_soon_threadsafe(context.mock_received_response_subject.on_next, context.text)

更新 2

似乎没有一种简单的方法来获取特定线程为自己创建和使用的事件循环,因此我采纳了 Victor 的建议并将 asyncio.new_event_loop 模拟为 return我的测试代码创建和存储的事件循环:

def _apply_mock_event_loop_patch(context):
    # Close any already-existing exit stacks.
    if hasattr(context, 'mock_event_loop_exit_stack'):
        context.mock_event_loop_exit_stack.close()

    context.test_loop = asyncio.new_event_loop()
    print(context.test_loop)
    context.mock_event_loop_exit_stack = ExitStack()
    context.mock_event_loop_exit_stack.enter_context(
        patch.object(asyncio, 'new_event_loop', spec=True, return_value=context.test_loop))

我将 'mock web response received' 测试步骤更改为执行以下操作:

@when('the communications service receives a response from TestCube Web Service')
def step_impl(context):
    loop = context.test_loop
    loop.call_soon_threadsafe(context.mock_received_response_subject.on_next, context.text)

好消息是我实际上让 Relay.enabled 协程在执行此步骤时触发!

现在唯一的问题是最后的测试步骤,我等待我在自己的线程中执行 CLI 得到的未来,并验证 CLI 是否在 stdout:

上发送它
@then('the CLI should print "{output}"')
def step_impl(context, output):
    if 'async.cli' in context.tags:
        loop = asyncio.get_event_loop() # main loop, not test loop
        result = loop.run_until_complete(context.async_result)
    else:
        result = context.result
    assert_that(result.output, equal_to(output))

我试过玩这个,但我似乎无法让 context.async_result(存储 loop.run_in_executor 的未来)很好地过渡到 done 和 return 结果。在当前的实施中,第一次测试 (1.1) 出现错误,第二次 (1.2) 出现无限期挂起:

 @mock.comms @async.cli @wip
  Scenario Outline: Querying relay enable state -- @1.1                           # testcube/tests/features/relay.feature:45
    When the user queries the enable state of relay 0                             # testcube/tests/features/steps/relay.py:17 0.003s
    Then the CLI should query the web service about the enable state of relay 0   # testcube/tests/features/steps/relay.py:48 0.000s
    When the communications service receives a response from TestCube Web Service # testcube/tests/features/steps/core.py:58 0.000s
      """
      {'module':'relays','path':'relays[0].enabled','data':[True]}'
      """
    Then the CLI should print "True"                                              # testcube/tests/features/steps/core.py:94 0.003s
      Traceback (most recent call last):
        File "/Users/davidfallah/testcube_env/lib/python3.5/site-packages/behave/model.py", line 1456, in run
          match.run(runner.context)
        File "/Users/davidfallah/testcube_env/lib/python3.5/site-packages/behave/model.py", line 1903, in run
          self.func(context, *args, **kwargs)
        File "testcube/tests/features/steps/core.py", line 99, in step_impl
          result = loop.run_until_complete(context.async_result)
        File "/usr/local/Cellar/python3/3.5.2_1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
          return future.result()
        File "/usr/local/Cellar/python3/3.5.2_1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 274, in result
          raise self._exception
        File "/usr/local/Cellar/python3/3.5.2_1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/concurrent/futures/thread.py", line 55, in run
          result = self.fn(*self.args, **self.kwargs)
        File "/Users/davidfallah/testcube_env/lib/python3.5/site-packages/click/testing.py", line 299, in invoke
          output = out.getvalue()
      ValueError: I/O operation on closed file.

      Captured stdout:
      RECEIVED WEB RESPONSE: {'module':'relays','path':'relays[0].enabled','data':[True]}'
      <Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/Cellar/python3/3.5.2_1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py:431]>

  @mock.comms @async.cli @wip
  Scenario Outline: Querying relay enable state -- @1.2                           # testcube/tests/features/relay.feature:46
    When the user queries the enable state of relay 1                             # testcube/tests/features/steps/relay.py:17 0.005s
    Then the CLI should query the web service about the enable state of relay 1   # testcube/tests/features/steps/relay.py:48 0.001s
    When the communications service receives a response from TestCube Web Service # testcube/tests/features/steps/core.py:58 0.000s
      """
      {'module':'relays','path':'relays[1].enabled','data':[False]}'
      """
RECEIVED WEB RESPONSE: {'module':'relays','path':'relays[1].enabled','data':[False]}'
    Then the CLI should print "False"                                             # testcube/tests/features/steps/core.py:94

第 3 章:结局

去他妈的所有这些异步多线程的东西,我太笨了。

首先,不要像这样描述场景...

When the user queries the enable state of relay <relay_id>
Then the CLI should query the web service about the enable state of relay <relay_id>
When the communications service receives a response from TestCube Web Service:
  """
  {"module":"relays","path":"relays[<relay_id>].enabled","data":[<relay_enabled>]}
  """
Then the CLI should print "<relay_enabled>"

我们这样描述:

Given the communications service will respond to requests:
  """
  {"module":"relays","path":"relays[<relay_id>].enabled","data":[<relay_enabled>]}
  """
When the user queries the enable state of relay <relay_id>
Then the CLI should query the web service about the enable state of relay <relay_id>
And the CLI should print "<relay_enabled>"

实施新的给定步骤:

@given('the communications service will respond to requests')
def step_impl(context):
    response = context.text

    def publish_mock_response(_):
        loop = context.test_loop
        loop.call_soon_threadsafe(context.mock_received_response_subject.on_next, response)

    # Configure the mock comms service to publish a mock response when a request is made.
    instance = context.mock_comms.return_value
    instance.send_request.on_next.side_effect = publish_mock_response

2 features passed, 0 failed, 0 skipped
22 scenarios passed, 0 failed, 0 skipped
58 steps passed, 0 failed, 0 skipped, 0 undefined
Took 0m0.111s

我发现你的代码有两个问题:

  • asyncio 不是线程安全的,除非您使用 call_soon_threadsafe or run_coroutine_threadsafe. RxPy doesn't use any of those in Observable.to_future,因此您必须在运行 asyncio 事件循环的同一线程中访问 RxPy 对象。
  • RxPy 设置调用 on_completed 时的未来结果,以便等待可观察的 returns 最后一个对象发出。这意味着您必须同时调用 on_nexton_completed 才能将 await 变为 return。

这是一个工作示例:

import click
import asyncio
from rx.subjects import Subject
from click.testing import CliRunner

web_response_subject = Subject()
web_response_observable = web_response_subject.as_observable()
main_loop = asyncio.get_event_loop()

@click.group()
def cli():
    pass

@cli.resultcallback()
def result_handler(task, **_):
    future = asyncio.run_coroutine_threadsafe(task, main_loop)
    print(future.result())

@cli.command()
async def get_web_response():
    return await web_response_observable

def test():
    runner = CliRunner()
    future = main_loop.run_in_executor(
        None, runner.invoke, cli, ['get_web_response'])
    main_loop.call_later(1, web_response_subject.on_next, 'foo')
    main_loop.call_later(2, web_response_subject.on_completed)
    result = main_loop.run_until_complete(future)
    print(result.output, end='')

if __name__ == '__main__':
    test()