从另一个线程调度异步协程,无需大量回调和同步等待
Scheduling an asyncio coroutine from another thread without bunch of callbacks and synchronous waiting
我需要对 问题进行澄清。
我有一个发送消息的协程send
。我想将它安排在 loop1
(在线程 1 中是 运行ning)从 loop2
(在线程 2 中是 运行ning):
async def send_threadsafe(self, message, current_loop=loop2, dest_loop=loop1):
future = asyncio.run_coroutine_threadsafe(
send(message), loop=dest_loop
)
asyncio.run_coroutine_threadsafe
返回的future
是一个concurrent.futures.Future
,不能异步等待
所以问题是:我该如何正确等待 future
and/or 我应该如何安排我的 send
来获得可等待的对象?
我知道我能做到:
async def send_threadsafe(...):
future = ...
result = await current_loop.run_in_executor(None, future.result)
但是有没有不使用另一个线程的方法呢?因为 run_in_executor
会将 future.result
发送到线程池,而我不想使用该线程池。
我不想使用 call_soon_threadsafe
的原因是它需要创建多个回调。首先,在 loop1
中安排 运行ning send
。其次,运行 send
在 loop1
并在 loop2
安排第三次回调。第三,将结果设置为在第一个回调中创建的 future(因为 asyncio futures 不是线程安全的,我无法设置来自 loop1
的结果)。
您可以使用 asyncio.wrap_future
从并发未来获取异步未来:
async def send_threadsafe(self, message, destination, *, loop=loop):
concurrent = asyncio.run_coroutine_threadsafe(
send(message), loop=destination)
return await asyncio.wrap_future(concurrent, loop=loop)
可以通过实现异步执行器来实现同样的事情:
from concurrent.futures import Executor
class AsyncioExecutor(Executor):
def __init__(self, loop):
self.loop = loop
def submit(self, fn, *args, **kwargs):
coro = fn(*args, **kwargs)
return asyncio.run_coroutine_threadsafe(coro, self.loop)
示例:
executor = AsyncioExecutor(remote_loop)
result = await loop.run_in_executor(executor, send, message)
我需要对
我有一个发送消息的协程send
。我想将它安排在 loop1
(在线程 1 中是 运行ning)从 loop2
(在线程 2 中是 运行ning):
async def send_threadsafe(self, message, current_loop=loop2, dest_loop=loop1):
future = asyncio.run_coroutine_threadsafe(
send(message), loop=dest_loop
)
asyncio.run_coroutine_threadsafe
返回的future
是一个concurrent.futures.Future
,不能异步等待
所以问题是:我该如何正确等待 future
and/or 我应该如何安排我的 send
来获得可等待的对象?
我知道我能做到:
async def send_threadsafe(...):
future = ...
result = await current_loop.run_in_executor(None, future.result)
但是有没有不使用另一个线程的方法呢?因为 run_in_executor
会将 future.result
发送到线程池,而我不想使用该线程池。
我不想使用 call_soon_threadsafe
的原因是它需要创建多个回调。首先,在 loop1
中安排 运行ning send
。其次,运行 send
在 loop1
并在 loop2
安排第三次回调。第三,将结果设置为在第一个回调中创建的 future(因为 asyncio futures 不是线程安全的,我无法设置来自 loop1
的结果)。
您可以使用 asyncio.wrap_future
从并发未来获取异步未来:
async def send_threadsafe(self, message, destination, *, loop=loop):
concurrent = asyncio.run_coroutine_threadsafe(
send(message), loop=destination)
return await asyncio.wrap_future(concurrent, loop=loop)
可以通过实现异步执行器来实现同样的事情:
from concurrent.futures import Executor
class AsyncioExecutor(Executor):
def __init__(self, loop):
self.loop = loop
def submit(self, fn, *args, **kwargs):
coro = fn(*args, **kwargs)
return asyncio.run_coroutine_threadsafe(coro, self.loop)
示例:
executor = AsyncioExecutor(remote_loop)
result = await loop.run_in_executor(executor, send, message)