如何使异步池可取消?
How to make a asyncio pool cancelable?
我有一个pool_map
函数可以用来限制同时执行函数的数量
想法是 coroutine function 接受映射到可能参数列表的单个参数,但也将所有函数调用包装到信号量获取中,因此只有有限的数量是 运行一次:
from typing import Callable, Awaitable, Iterable, Iterator
from asyncio import Semaphore
A = TypeVar('A')
V = TypeVar('V')
async def pool_map(
func: Callable[[A], Awaitable[V]],
arg_it: Iterable[A],
size: int=10
) -> Generator[Awaitable[V], None, None]:
"""
Maps an async function to iterables
ensuring that only some are executed at once.
"""
semaphore = Semaphore(size)
async def sub(arg):
async with semaphore:
return await func(arg)
return map(sub, arg_it)
为了示例,我修改了上面的代码,但没有测试它,但我的变体运行良好。例如。你可以这样使用它:
from asyncio import get_event_loop, coroutine, as_completed
from contextlib import closing
URLS = [...]
async def run_all(awaitables):
for a in as_completed(awaitables):
result = await a
print('got result', result)
async def download(url): ...
if __name__ != '__main__':
pool = pool_map(download, URLS)
with closing(get_event_loop()) as loop:
loop.run_until_complete(run_all(pool))
但是如果在等待未来时抛出异常,就会出现问题。我看不到如何取消所有计划的或仍然-运行 任务,那些仍在等待获取信号量的任务。
是否有我不知道的库或优雅的构建块,或者我必须自己构建所有部分? (即 Semaphore
可以访问其服务员,as_finished
可以访问其 运行 任务队列,...)
这是一个天真的解决方案,基于如果任务已经完成,cancel
是 no-op 的事实:
async def run_all(awaitables):
futures = [asyncio.ensure_future(a) for a in awaitables]
try:
for fut in as_completed(futures):
result = await fut
print('got result', result)
except:
for future in futures:
future.cancel()
await asyncio.wait(futures)
使用ensure_future
得到Task
而不是协程:
import asyncio
from contextlib import closing
def pool_map(func, args, size=10):
"""
Maps an async function to iterables
ensuring that only some are executed at once.
"""
semaphore = asyncio.Semaphore(size)
async def sub(arg):
async with semaphore:
return await func(arg)
tasks = [asyncio.ensure_future(sub(x)) for x in args]
return tasks
async def f(n):
print(">>> start", n)
if n == 7:
raise Exception("boom!")
await asyncio.sleep(n / 10)
print("<<< end", n)
return n
async def run_all(tasks):
exc = None
for a in asyncio.as_completed(tasks):
try:
result = await a
print('=== result', result)
except asyncio.CancelledError as e:
print("!!! cancel", e)
except Exception as e:
print("Exception in task, cancelling!")
for t in tasks:
t.cancel()
exc = e
if exc:
raise exc
pool = pool_map(f, range(1, 20), 3)
with closing(asyncio.get_event_loop()) as loop:
loop.run_until_complete(run_all(pool))
我有一个pool_map
函数可以用来限制同时执行函数的数量
想法是 coroutine function 接受映射到可能参数列表的单个参数,但也将所有函数调用包装到信号量获取中,因此只有有限的数量是 运行一次:
from typing import Callable, Awaitable, Iterable, Iterator
from asyncio import Semaphore
A = TypeVar('A')
V = TypeVar('V')
async def pool_map(
func: Callable[[A], Awaitable[V]],
arg_it: Iterable[A],
size: int=10
) -> Generator[Awaitable[V], None, None]:
"""
Maps an async function to iterables
ensuring that only some are executed at once.
"""
semaphore = Semaphore(size)
async def sub(arg):
async with semaphore:
return await func(arg)
return map(sub, arg_it)
为了示例,我修改了上面的代码,但没有测试它,但我的变体运行良好。例如。你可以这样使用它:
from asyncio import get_event_loop, coroutine, as_completed
from contextlib import closing
URLS = [...]
async def run_all(awaitables):
for a in as_completed(awaitables):
result = await a
print('got result', result)
async def download(url): ...
if __name__ != '__main__':
pool = pool_map(download, URLS)
with closing(get_event_loop()) as loop:
loop.run_until_complete(run_all(pool))
但是如果在等待未来时抛出异常,就会出现问题。我看不到如何取消所有计划的或仍然-运行 任务,那些仍在等待获取信号量的任务。
是否有我不知道的库或优雅的构建块,或者我必须自己构建所有部分? (即 Semaphore
可以访问其服务员,as_finished
可以访问其 运行 任务队列,...)
这是一个天真的解决方案,基于如果任务已经完成,cancel
是 no-op 的事实:
async def run_all(awaitables):
futures = [asyncio.ensure_future(a) for a in awaitables]
try:
for fut in as_completed(futures):
result = await fut
print('got result', result)
except:
for future in futures:
future.cancel()
await asyncio.wait(futures)
使用ensure_future
得到Task
而不是协程:
import asyncio
from contextlib import closing
def pool_map(func, args, size=10):
"""
Maps an async function to iterables
ensuring that only some are executed at once.
"""
semaphore = asyncio.Semaphore(size)
async def sub(arg):
async with semaphore:
return await func(arg)
tasks = [asyncio.ensure_future(sub(x)) for x in args]
return tasks
async def f(n):
print(">>> start", n)
if n == 7:
raise Exception("boom!")
await asyncio.sleep(n / 10)
print("<<< end", n)
return n
async def run_all(tasks):
exc = None
for a in asyncio.as_completed(tasks):
try:
result = await a
print('=== result', result)
except asyncio.CancelledError as e:
print("!!! cancel", e)
except Exception as e:
print("Exception in task, cancelling!")
for t in tasks:
t.cancel()
exc = e
if exc:
raise exc
pool = pool_map(f, range(1, 20), 3)
with closing(asyncio.get_event_loop()) as loop:
loop.run_until_complete(run_all(pool))