concurrent.futures.Future可以转换成asyncio.Future吗?
Can concurrent.futures.Future be converted to asyncio.Future?
我在编写多线程代码多年后正在练习 asyncio
。
注意到一些我觉得很奇怪的事情。在 asyncio
和 concurrent
中都有一个 Future
对象。
from asyncio import Future
from concurrent.futures import Future
猜猜每个人都有自己的角色..
我的问题是我是否可以将 concurrent.future.Future
转移到 asyncio.Future
(或相反)?
My question is if i can transfer concurrent.future.Future
to asyncio.Future
(or the opposite)?
如果您所说的“转移”是指将一种转换为另一种,是的,这是可能的,尽管弥合阻抗不匹配可能需要一些工作。
转换一个concurrent.futures.Future
into an asyncio.Future
, you can call asyncio.wrap_future
. The returned asyncio future is awaitable in the asyncio event loop and will complete when the underlying threading future completes. This is effectively how run_in_executor
is implemented.
没有 public 将 asyncio future 直接转换为 concurrent.futures
future 的功能,但是有 asyncio.run_coroutine_threadsafe
函数,它需要一个 coroutine,将它提交给一个事件循环,returns 一个并发的 future,它在 asyncio future 完成时完成。这可用于有效地将任何可等待的异步未来转换为并发未来,如下所示:
def to_concurrent(fut, loop):
async def wait():
await fut
return asyncio.run_coroutine_threadsafe(wait(), loop)
返回的 future 将表现得像您对并发 future 的期望,例如它的 result()
method will block, etc. One thing you might want to be careful of is that callbacks added to the concurrent future with add_done_callback
运行 在标记未来完成的线程中,在本例中是事件循环线程。这意味着如果你添加一些完成的回调,你需要注意不要在它们的实现中调用阻塞调用,以免阻塞事件循环。
请注意,调用 run_coroutine_threadsafe
需要事件循环在其他线程中实际 运行。 (例如,您可以启动一个后台线程并让它执行 loop.run_forever
。)
对于 "concurrent future to asyncio future" 部分,这是我使用的实用程序。
from typing import List, Any
from concurrent.futures.thread import ThreadPoolExecutor
import asyncio
class AsyncThreadPool(ThreadPoolExecutor):
_futures: List[asyncio.Future]
_loop: asyncio.AbstractEventLoop
def __init__(self, max_workers=None):
super().__init__(max_workers)
self._futures = []
def queue(self, fn):
self._loop = asyncio.get_event_loop()
fut = self._loop.create_future()
self._futures.append(fut)
self.submit(self._entry, fn, fut)
def queueAsync(self, coroutine):
def newLoop():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(coroutine)
self.queue(newLoop)
def _entry(self, fn, fut: asyncio.Future):
try:
result = fn()
self._loop.call_soon_threadsafe(fut.set_result, result)
except Exception as e:
self._loop.call_soon_threadsafe(fut.set_exception, e)
async def gather(self) -> List[Any]:
return await asyncio.gather(*self._futures)
你可以这样使用它:
with AsyncThreadPool() as pool:
# Queue some sync function (will be executed on another thread)
pool.queue(someHeavySyncFunction)
# Queue a coroutine that will be executed on a new event loop running on another thread
pool.queue(otherAsyncFunction())
# Gather results (non blocking for your current loop)
res: List[Any] = await pool.gather()
asyncio中有一个函数叫wrap_future
。
Wrap a concurrent.futures.Future object in a asyncio.Future object.
见https://docs.python.org/3/library/asyncio-future.html#asyncio.wrap_future
我在编写多线程代码多年后正在练习 asyncio
。
注意到一些我觉得很奇怪的事情。在 asyncio
和 concurrent
中都有一个 Future
对象。
from asyncio import Future
from concurrent.futures import Future
猜猜每个人都有自己的角色..
我的问题是我是否可以将 concurrent.future.Future
转移到 asyncio.Future
(或相反)?
My question is if i can transfer
concurrent.future.Future
toasyncio.Future
(or the opposite)?
如果您所说的“转移”是指将一种转换为另一种,是的,这是可能的,尽管弥合阻抗不匹配可能需要一些工作。
转换一个concurrent.futures.Future
into an asyncio.Future
, you can call asyncio.wrap_future
. The returned asyncio future is awaitable in the asyncio event loop and will complete when the underlying threading future completes. This is effectively how run_in_executor
is implemented.
没有 public 将 asyncio future 直接转换为 concurrent.futures
future 的功能,但是有 asyncio.run_coroutine_threadsafe
函数,它需要一个 coroutine,将它提交给一个事件循环,returns 一个并发的 future,它在 asyncio future 完成时完成。这可用于有效地将任何可等待的异步未来转换为并发未来,如下所示:
def to_concurrent(fut, loop):
async def wait():
await fut
return asyncio.run_coroutine_threadsafe(wait(), loop)
返回的 future 将表现得像您对并发 future 的期望,例如它的 result()
method will block, etc. One thing you might want to be careful of is that callbacks added to the concurrent future with add_done_callback
运行 在标记未来完成的线程中,在本例中是事件循环线程。这意味着如果你添加一些完成的回调,你需要注意不要在它们的实现中调用阻塞调用,以免阻塞事件循环。
请注意,调用 run_coroutine_threadsafe
需要事件循环在其他线程中实际 运行。 (例如,您可以启动一个后台线程并让它执行 loop.run_forever
。)
对于 "concurrent future to asyncio future" 部分,这是我使用的实用程序。
from typing import List, Any
from concurrent.futures.thread import ThreadPoolExecutor
import asyncio
class AsyncThreadPool(ThreadPoolExecutor):
_futures: List[asyncio.Future]
_loop: asyncio.AbstractEventLoop
def __init__(self, max_workers=None):
super().__init__(max_workers)
self._futures = []
def queue(self, fn):
self._loop = asyncio.get_event_loop()
fut = self._loop.create_future()
self._futures.append(fut)
self.submit(self._entry, fn, fut)
def queueAsync(self, coroutine):
def newLoop():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(coroutine)
self.queue(newLoop)
def _entry(self, fn, fut: asyncio.Future):
try:
result = fn()
self._loop.call_soon_threadsafe(fut.set_result, result)
except Exception as e:
self._loop.call_soon_threadsafe(fut.set_exception, e)
async def gather(self) -> List[Any]:
return await asyncio.gather(*self._futures)
你可以这样使用它:
with AsyncThreadPool() as pool:
# Queue some sync function (will be executed on another thread)
pool.queue(someHeavySyncFunction)
# Queue a coroutine that will be executed on a new event loop running on another thread
pool.queue(otherAsyncFunction())
# Gather results (non blocking for your current loop)
res: List[Any] = await pool.gather()
asyncio中有一个函数叫wrap_future
。
Wrap a concurrent.futures.Future object in a asyncio.Future object.
见https://docs.python.org/3/library/asyncio-future.html#asyncio.wrap_future