将作业提交到异步事件循环
Submit a job to an asyncio event loop
我想将作业从线程提交到 asyncio
事件循环(就像 run_in_executor 但相反)。
asyncio
文档对 concurrency and multithreading 的描述如下:
To schedule a callback from a different thread, the BaseEventLoop.call_soon_threadsafe() method should be used.
Example to schedule a coroutine from a different thread:
loop.call_soon_threadsafe(asyncio.async, coro_func())
这很好用,但协程的结果丢失了。
相反,可以使用向 async
(或 ensure_future
)返回的未来添加完成回调的函数,以便线程可以通过 concurrent.futures.Future.
标准库中没有实现这样的功能是否有特殊原因?还是我错过了更简单的方法?
我的请求得到了满足 run_coroutine_threadsafe function has been implemented here。
示例:
def target(loop, timeout=None):
future = asyncio.run_coroutine_threadsafe(add(1, b=2), loop)
return future.result(timeout)
async def add(a, b):
await asyncio.sleep(1)
return a + b
loop = asyncio.get_event_loop()
future = loop.run_in_executor(None, target, loop)
assert loop.run_until_complete(future) == 3
我最初发布了 concurrent.futures.Executor 的子 class 仍然可以实现为:
class LoopExecutor(concurrent.futures.Executor):
"""An Executor subclass that uses an event loop
to execute calls asynchronously."""
def __init__(self, loop=None):
"""Initialize the executor with a given loop."""
self.loop = loop or asyncio.get_event_loop()
def submit(self, fn, *args, **kwargs):
"""Schedule the callable, fn, to be executed as fn(*args **kwargs).
Return a Future object representing the execution of the callable."""
coro = asyncio.coroutine(fn)(*args, **kwargs)
return asyncio.run_coroutine_threadsafe(coro, self.loop)
我想将作业从线程提交到 asyncio
事件循环(就像 run_in_executor 但相反)。
asyncio
文档对 concurrency and multithreading 的描述如下:
To schedule a callback from a different thread, the BaseEventLoop.call_soon_threadsafe() method should be used. Example to schedule a coroutine from a different thread:
loop.call_soon_threadsafe(asyncio.async, coro_func())
这很好用,但协程的结果丢失了。
相反,可以使用向 async
(或 ensure_future
)返回的未来添加完成回调的函数,以便线程可以通过 concurrent.futures.Future.
标准库中没有实现这样的功能是否有特殊原因?还是我错过了更简单的方法?
我的请求得到了满足 run_coroutine_threadsafe function has been implemented here。
示例:
def target(loop, timeout=None):
future = asyncio.run_coroutine_threadsafe(add(1, b=2), loop)
return future.result(timeout)
async def add(a, b):
await asyncio.sleep(1)
return a + b
loop = asyncio.get_event_loop()
future = loop.run_in_executor(None, target, loop)
assert loop.run_until_complete(future) == 3
我最初发布了 concurrent.futures.Executor 的子 class 仍然可以实现为:
class LoopExecutor(concurrent.futures.Executor):
"""An Executor subclass that uses an event loop
to execute calls asynchronously."""
def __init__(self, loop=None):
"""Initialize the executor with a given loop."""
self.loop = loop or asyncio.get_event_loop()
def submit(self, fn, *args, **kwargs):
"""Schedule the callable, fn, to be executed as fn(*args **kwargs).
Return a Future object representing the execution of the callable."""
coro = asyncio.coroutine(fn)(*args, **kwargs)
return asyncio.run_coroutine_threadsafe(coro, self.loop)