concurrent.futures.Future可以转换成asyncio.Future吗?

Can concurrent.futures.Future be converted to asyncio.Future?

我在编写多线程代码多年后正在练习 asyncio

注意到一些我觉得很奇怪的事情。在 asyncioconcurrent 中都有一个 Future 对象。

from asyncio import Future
from concurrent.futures import Future

猜猜每个人都有自己的角色..

我的问题是我是否可以将 concurrent.future.Future 转移到 asyncio.Future(或相反)?

My question is if i can transfer concurrent.future.Future to asyncio.Future (or the opposite)?

如果您所说的“转移”是指将一种转换为另一种,是的,这是可能的,尽管弥合阻抗不匹配可能需要一些工作。

转换一个concurrent.futures.Future into an asyncio.Future, you can call asyncio.wrap_future. The returned asyncio future is awaitable in the asyncio event loop and will complete when the underlying threading future completes. This is effectively how run_in_executor is implemented.

没有 public 将 asyncio future 直接转换为 concurrent.futures future 的功能,但是有 asyncio.run_coroutine_threadsafe 函数,它需要一个 coroutine,将它提交给一个事件循环,returns 一个并发的 future,它在 asyncio future 完成时完成。这可用于有效地将任何可等待的异步未来转换为并发未来,如下所示:

def to_concurrent(fut, loop):
    async def wait():
        await fut
    return asyncio.run_coroutine_threadsafe(wait(), loop)

返回的 future 将表现得像您对并发 future 的期望,例如它的 result() method will block, etc. One thing you might want to be careful of is that callbacks added to the concurrent future with add_done_callback 运行 在标记未来完成的线程中,在本例中是事件循环线程。这意味着如果你添加一些完成的回调,你需要注意不要在它们的实现中调用阻塞调用,以免阻塞事件循环。

请注意,调用 run_coroutine_threadsafe 需要事件循环在其他线程中实际 运行。 (例如,您可以启动一个后台线程并让它执行 loop.run_forever。)

对于 "concurrent future to asyncio future" 部分,这是我使用的实用程序。

from typing import List, Any
from concurrent.futures.thread import ThreadPoolExecutor
import asyncio


class AsyncThreadPool(ThreadPoolExecutor):
    _futures: List[asyncio.Future]
    _loop: asyncio.AbstractEventLoop

    def __init__(self, max_workers=None):
        super().__init__(max_workers)
        self._futures = []

    def queue(self, fn):
        self._loop = asyncio.get_event_loop()
        fut = self._loop.create_future()
        self._futures.append(fut)
        self.submit(self._entry, fn, fut)

    def queueAsync(self, coroutine):
        def newLoop():
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            return loop.run_until_complete(coroutine)
        self.queue(newLoop)

    def _entry(self, fn, fut: asyncio.Future):
        try:
            result = fn()
            self._loop.call_soon_threadsafe(fut.set_result, result)
        except Exception as e:
            self._loop.call_soon_threadsafe(fut.set_exception, e)

    async def gather(self) -> List[Any]:
        return await asyncio.gather(*self._futures)

你可以这样使用它:

with AsyncThreadPool() as pool:
    # Queue some sync function (will be executed on another thread)
    pool.queue(someHeavySyncFunction)
    # Queue a coroutine that will be executed on a new event loop running on another thread
    pool.queue(otherAsyncFunction())

    # Gather results (non blocking for your current loop)
    res: List[Any] = await pool.gather()

asyncio中有一个函数叫wrap_future

Wrap a concurrent.futures.Future object in a asyncio.Future object.

https://docs.python.org/3/library/asyncio-future.html#asyncio.wrap_future