来自 运行 个线程调用其他线程上的协程

From running thread call coroutine on other thread

我正在替换现有程序的一部分。那个原始程序使用线程。有一个特定的 class 继承自 threading.Thread 我需要替换哪些功能,但我需要保持界面不变。

我正在集成的功能打包在一个经常使用 asyncio 的库中。

我要替换的 class 的原始调用是这样的:

network = Network()
network.start()

network.fetch_something()  # crashes!

network.stop()

我已经到了我的替换 class 也继承自 threading.Thread 的地步,我可以通过客户端库从 run 方法内连接到我的后端:

class Network(threading.Thread):
     def __init__(self):
         self._loop = asyncio.new_event_loop()
         self._client = Client()  # this is the library

     def run(self):
         self._loop.run_until_complete(self.__connect())  # works dandy, implementation not shown
         self._loop.run_forever()

     def fetch_something(self):
         return self._loop.run_until_complete(self._client.fetch_something())

运行 此代码抛出异常:

RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one

我大概明白这里发生了什么。在 run 方法中,事情成功了,因为同一个线程 运行 事件循环是调用者。在另一种情况下,另一个线程是调用者,因此是问题所在。 正如您可能已经注意到的那样,我希望通过使用相同的事件循环可以解决问题。唉,那没有成功。

我真的很想保持界面原样,否则我将在今年剩余时间进行重构。我可以相对轻松地将参数传递给 Network class 的构造函数。我尝试传入在主线程上创建的事件循环,但结果是一样的。

(注意这是作者遇到的相反问题:Call coroutine within Thread

从不同线程调度协程时,必须使用asyncio.run_coroutine_threadsafe。例如:

    def fetch_something(self):
        future = asyncio.run_coroutine_threadsafe(
            self._client.fetch_something(), loop)
        return future.result()

run_coroutine_threadsafe 以线程安全的方式使用事件循环调度协程,returns concurrent.futures.Future。您可以使用返回的 future 来简单地等待结果,如上所示,但您也可以将其传递给其他函数,轮询结果是否到达,或者实现超时。

当组合线程和 asyncio 时,请记住确保 all 与来自其他线程的事件循环交互(甚至调用像 loop.stop to implement Network.stop) is done using loop.call_soon_threadsafeasyncio.run_coroutine_threadsafe.