Python 3.5 asyncio 从不同线程中的同步代码在事件循环上执行协程

Python 3.5 asyncio execute coroutine on event loop from synchronous code in different thread

我希望有人能在这里帮助我。

我有一个对象能够拥有 return 协程对象的属性。这工作得很好,但是我有一种情况,我需要在一个单独的线程中从同步代码中获取协程对象的结果,而事件循环当前是 运行ning。我想出的代码是:

def get_sync(self, key: str, default: typing.Any=None) -> typing.Any:
    """
    Get an attribute synchronously and safely.

    Note:
        This does nothing special if an attribute is synchronous. It only
        really has a use for asynchronous attributes. It processes
        asynchronous attributes synchronously, blocking everything until
        the attribute is processed. This helps when running SQL code that
        cannot run asynchronously in coroutines.

    Args:
        key (str): The Config object's attribute name, as a string.
        default (Any): The value to use if the Config object does not have
            the given attribute. Defaults to None.

    Returns:
        Any: The vale of the Config object's attribute, or the default
        value if the Config object does not have the given attribute.
    """
    ret = self.get(key, default)

    if asyncio.iscoroutine(ret):
        if loop.is_running():
            loop2 = asyncio.new_event_loop()
            try:
                ret = loop2.run_until_complete(ret)

            finally:
                loop2.close()
        else:
            ret = loop.run_until_complete(ret)

    return ret

我正在寻找的是一种在多线程环境中同步获取协程对象结果的安全方法。 self.get() 可以 return 一个协程对象,因为我已经设置了属性来提供它们。我发现的问题是:事件循环是否为 运行ning。在堆栈溢出和其他几个网站上搜索了几个小时后,我的(损坏的)解决方案就在上面。如果循环是 运行ning,我会创建一个新的事件循环,并在新的事件循环中 运行 我的协程。这有效,只是代码永远挂在 ret = loop2.run_until_complete(ret) 行。

现在,我有以下场景和结果:

  1. self.get() 的结果不是协程
    • Returns 结果。 [好]
  2. self.get() 的结果是协程,事件循环不是 运行ning(基本上与事件循环在同一个线程中)
    • Returns 结果。 [好]
  3. self.get() 的结果是协程,事件循环是 运行ning(基本上与事件循环在不同的线程中)
    • 永远挂起等待结果。 [差]

有谁知道我该如何解决错误的结果,以便获得我需要的价值?谢谢。

我希望我在这里说得有道理。

我确实有充分且正当的理由使用线程;具体来说,我使用的是非异步的 SQLAlchemy,我将 SQLAlchemy 代码放到 ThreadPoolExecutor 中以安全地处理它。但是,我需要能够从这些线程中查询这些异步属性,以便 SQLAlchemy 代码安全地获取某些配置值。不,我不会为了完成我的需要而从 SQLAlchemy 切换到另一个系统,所以请不要提供它的替代方案。该项目进展太远,无法将如此基础的东西切换到它。

我尝试使用 asyncio.run_coroutine_threadsafe()loop.call_soon_threadsafe(),但都失败了。到目前为止,这已经取得了最大的成功,我觉得我只是遗漏了一些明显的东西。

有机会时,我会编写一些代码来提供问题示例。

好的,我实现了一个示例案例,它按我预期的方式工作。所以我的问题很可能在代码的其他地方。保持打开状态,如果需要,将更改问题以适应我的实际问题。

关于为什么 asyncio.run_coroutine_threadsafe() 中的 concurrent.futures.Future 会永远挂起而不是 return 结果,有没有人有任何可能的想法?

不幸的是,我的示例代码 没有重复我的错误 ,如下所示:

import asyncio
import typing

loop = asyncio.get_event_loop()

class ConfigSimpleAttr:
    __slots__ = ('value', '_is_async')

    def __init__(
        self,
        value: typing.Any,
        is_async: bool=False
    ):
        self.value = value
        self._is_async = is_async

    async def _get_async(self):
        return self.value

    def __get__(self, inst, cls):
        if self._is_async and loop.is_running():
            return self._get_async()
        else:
            return self.value

class BaseConfig:
    __slots__ = ()

    attr1 = ConfigSimpleAttr(10, True)
    attr2 = ConfigSimpleAttr(20, True)    

    def get(self, key: str, default: typing.Any=None) -> typing.Any:
        return getattr(self, key, default)

    def get_sync(self, key: str, default: typing.Any=None) -> typing.Any:
        ret = self.get(key, default)

        if asyncio.iscoroutine(ret):
            if loop.is_running():
                fut = asyncio.run_coroutine_threadsafe(ret, loop)
                print(fut, fut.running())
                ret = fut.result()
            else:
                ret = loop.run_until_complete(ret)

        return ret

config = BaseConfig()

def example_func():
    return config.get_sync('attr1')

async def main():
    a1 = await loop.run_in_executor(None, example_func)
    a2 = await config.attr2
    val = a1 + a2
    print('{a1} + {a2} = {val}'.format(a1=a1, a2=a2, val=val))
    return val

loop.run_until_complete(main())

这正是我的代码正在执行的操作的精简版本,并且该示例有效,即使我的实际应用程序无效。我不知道在哪里寻找答案。欢迎就在哪里尝试追踪我的 "stuck forever" 问题提出建议,即使我上面的代码实际上并没有重复这个问题。

你不太可能需要同时运行几个事件循环,所以这部分看起来很不对:

    if loop.is_running():
        loop2 = asyncio.new_event_loop()
        try:
            ret = loop2.run_until_complete(ret)

        finally:
            loop2.close()
    else:
        ret = loop.run_until_complete(ret)

甚至测试循环是否 运行ning 似乎也不是正确的方法。最好将(唯一的)运行ning 循环显式提供给 get_sync 并使用 run_coroutine_threadsafe:

安排协程
def get_sync(self, key, loop):
    ret = self.get(key, default)
    if not asyncio.iscoroutine(ret):
        return ret
    future = asyncio.run_coroutine_threadsafe(ret, loop)
    return future.result()

编辑:挂起问题可能与被安排在错误循环中的任务有关(例如,在调用协程时忘记了可选的 loop 参数)。使用 PR 303(现已合并)应该更容易调试此类问题:当循环和未来不匹配时,会引发 RuntimeError。因此,您可能希望 运行 使用最新版本的 asyncio 进行测试。

好的,通过采用不同的方法,我的代码可以正常工作了。问题与使用具有文件 IO 的东西有关,我在文件 IO 组件上使用 loop.run_in_executor() 将其转换为协程。然后,我试图在从另一个线程调用的同步函数中使用它,并在该函数上使用另一个 loop.run_in_executor() 进行处理。这是我代码中一个非常重要的例程(在我的 short-运行 代码执行期间可能被调用一百万次或更多次),我决定我的逻辑变得太复杂了。所以...我把它简单化了。现在,如果我想异步使用文件 IO 组件,我明确地使用我的 "get_async()" 方法,否则,我通过正常的属性访问使用我的属性。

通过消除逻辑的复杂性,它使代码更清晰、更易于理解,更重要的是,它确实有效。虽然我不是 100% 确定我知道问题的根本原因(我相信它与处理属性的线程有关,然后启动另一个线程在处理之前尝试读取属性,这导致了诸如竞争条件之类的事情并停止了我的代码,但不幸的是,我永远无法在我的应用程序之外复制错误以完全证明它),我能够克服它并继续我的开发工作。