Select 第一个结果来自 asyncio 中的两个协程
Select first result from two coroutines in asyncio
问题
使用 Python 的 asyncio
模块,我如何 select 来自多个协程的第一个结果?
例子
我可能想在等待队列时实现超时:
result = yield from select(asyncio.sleep(1),
queue.get())
类似操作
这类似于 Go's select
or Clojure's core.async.alt!
. It is something like the converse of asyncio.gather
(收集就像 all
,select 就像 any
。)
您可以使用 asyncio.wait
和 asyncio.as_completed
来实现:
import asyncio
@asyncio.coroutine
def ok():
yield from asyncio.sleep(1)
return 5
@asyncio.coroutine
def select1(*futures, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
return (yield from next(asyncio.as_completed(futures)))
@asyncio.coroutine
def select2(*futures, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
done, running = yield from asyncio.wait(futures,
return_when=asyncio.FIRST_COMPLETED)
result = done.pop()
return result.result()
@asyncio.coroutine
def example():
queue = asyncio.Queue()
result = yield from select1(ok(), queue.get())
print('got {}'.format(result))
result = yield from select2(queue.get(), ok())
print('got {}'.format(result))
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(example())
输出:
got 5
got 5
Task was destroyed but it is pending!
task: <Task pending coro=<get() done, defined at /usr/lib/python3.4/asyncio/queues.py:170> wait_for=<Future pending cb=[Task._wakeup()]> cb=[as_completed.<locals>._on_completion() at /usr/lib/python3.4/asyncio/tasks.py:463]>
Task was destroyed but it is pending!
task: <Task pending coro=<get() done, defined at /usr/lib/python3.4/asyncio/queues.py:170> wait_for=<Future pending cb=[Task._wakeup()]>>
两个实现 return 第一个完成的 Future
产生的值,但您可以轻松地将其调整为 return Future
本身。请注意,因为传递给每个 select
实现的另一个 Future
永远不会产生,所以当进程退出时会发出警告。
简单的解决方案,使用 asyncio.wait
及其 FIRST_COMPLETED
参数:
import asyncio
async def something_to_wait():
await asyncio.sleep(1)
return "something_to_wait"
async def something_else_to_wait():
await asyncio.sleep(2)
return "something_else_to_wait"
async def wait_first():
done, pending = await asyncio.wait(
[something_to_wait(), something_else_to_wait()],
return_when=asyncio.FIRST_COMPLETED)
print("done", done)
print("pending", pending)
asyncio.get_event_loop().run_until_complete(wait_first())
给出:
done {<Task finished coro=<something_to_wait() done, defined at stack.py:3> result='something_to_wait'>}
pending {<Task pending coro=<something_else_to_wait() running at stack.py:8> wait_for=<Future pending cb=[Task._wakeup()]>>}
Task was destroyed but it is pending!
task: <Task pending coro=<something_else_to_wait() running at stack.py:8> wait_for=<Future pending cb=[Task._wakeup()]>>
在想要对任务应用超时的情况下,有一个标准库函数可以做到这一点:asyncio.wait_for()
。你的例子可以这样写:
try:
result = await asyncio.wait_for(queue.get(), timeout=1)
except asyncio.TimeoutError:
# This block will execute if queue.get() takes more than 1s.
result = ...
但这只适用于超时的特定情况。这里的其他两个答案概括为任意一组任务,但这些答案都没有说明如何清理没有先完成的任务。这就是导致输出中出现 "Task was destroyed but it is pending" 消息的原因。实际上,您应该对那些待处理的任务做些什么。根据您的示例,我假设您不关心其他任务的结果。这是一个 wait_first()
函数的示例,它 return 是第一个完成任务的值并取消剩余任务。
import asyncio, random
async def foo(x):
r = random.random()
print('foo({:d}) sleeping for {:0.3f}'.format(x, r))
await asyncio.sleep(r)
print('foo({:d}) done'.format(x))
return x
async def wait_first(*futures):
''' Return the result of the first future to finish. Cancel the remaining
futures. '''
done, pending = await asyncio.wait(futures,
return_when=asyncio.FIRST_COMPLETED)
gather = asyncio.gather(*pending)
gather.cancel()
try:
await gather
except asyncio.CancelledError:
pass
return done.pop().result()
async def main():
result = await wait_first(foo(1), foo(2))
print('the result is {}'.format(result))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
运行 这个例子:
# export PYTHONASYNCIODEBUG=1
# python3 test.py
foo(1) sleeping for 0.381
foo(2) sleeping for 0.279
foo(2) done
the result is 2
# python3 test.py
foo(1) sleeping for 0.048
foo(2) sleeping for 0.515
foo(1) done
the result is 1
# python3 test.py
foo(1) sleeping for 0.396
foo(2) sleeping for 0.188
foo(2) done
the result is 2
没有关于挂起任务的错误消息,因为每个挂起任务都已正确清理。
在实践中,您可能希望 wait_first()
到 return 未来,而不是未来的结果,否则试图找出哪个未来完成会非常混乱。但在此处的示例中,我 return 编辑了未来的结果,因为它看起来更干净一些。
这是一个更强大的解决方案,它基于处理以下问题的早期示例:
- 使用递归到 return 第一个非空结果(前面的例子将 return 第一个结果,不管是空还是非空)
- Returns 第一个非空结果,即使另一个任务引发异常
- 如果仅 returned 非空结果并引发异常,则引发最后一个异常
- 处理同时完成的多个任务 - 在实践中这种情况非常罕见,但它可能会在单元测试中弹出,其中伪造的异步任务会立即完成。
请注意,由于使用赋值运算符,此示例需要 Python 3.8。
async def wait_first(*tasks):
"""Return the result of first async task to complete with a non-null result"""
# Get first completed task(s)
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
# Tasks MAY complete at same time e.g. in unit tests :)
# Coalesce the first result if present
for task in done:
exception = task.exception()
if exception is None and (result := task.result()):
break
else:
result = None
# Gather remaining tasks without raising exceptions
gather = asyncio.gather(*pending, return_exceptions=True)
# Cancel remaining tasks if result is non-null otherwise await next pending tasks
if result:
gather.cancel()
elif pending:
result = await wait_first(*pending)
# Await remaining tasks to ensure they are cancelled
try:
await gather
except asyncio.CancelledError:
pass
# Return result or raise last exception if no result was returned
if exception and result is None:
raise exception
else:
return result
问题
使用 Python 的 asyncio
模块,我如何 select 来自多个协程的第一个结果?
例子
我可能想在等待队列时实现超时:
result = yield from select(asyncio.sleep(1),
queue.get())
类似操作
这类似于 Go's select
or Clojure's core.async.alt!
. It is something like the converse of asyncio.gather
(收集就像 all
,select 就像 any
。)
您可以使用 asyncio.wait
和 asyncio.as_completed
来实现:
import asyncio
@asyncio.coroutine
def ok():
yield from asyncio.sleep(1)
return 5
@asyncio.coroutine
def select1(*futures, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
return (yield from next(asyncio.as_completed(futures)))
@asyncio.coroutine
def select2(*futures, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
done, running = yield from asyncio.wait(futures,
return_when=asyncio.FIRST_COMPLETED)
result = done.pop()
return result.result()
@asyncio.coroutine
def example():
queue = asyncio.Queue()
result = yield from select1(ok(), queue.get())
print('got {}'.format(result))
result = yield from select2(queue.get(), ok())
print('got {}'.format(result))
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(example())
输出:
got 5
got 5
Task was destroyed but it is pending!
task: <Task pending coro=<get() done, defined at /usr/lib/python3.4/asyncio/queues.py:170> wait_for=<Future pending cb=[Task._wakeup()]> cb=[as_completed.<locals>._on_completion() at /usr/lib/python3.4/asyncio/tasks.py:463]>
Task was destroyed but it is pending!
task: <Task pending coro=<get() done, defined at /usr/lib/python3.4/asyncio/queues.py:170> wait_for=<Future pending cb=[Task._wakeup()]>>
两个实现 return 第一个完成的 Future
产生的值,但您可以轻松地将其调整为 return Future
本身。请注意,因为传递给每个 select
实现的另一个 Future
永远不会产生,所以当进程退出时会发出警告。
简单的解决方案,使用 asyncio.wait
及其 FIRST_COMPLETED
参数:
import asyncio
async def something_to_wait():
await asyncio.sleep(1)
return "something_to_wait"
async def something_else_to_wait():
await asyncio.sleep(2)
return "something_else_to_wait"
async def wait_first():
done, pending = await asyncio.wait(
[something_to_wait(), something_else_to_wait()],
return_when=asyncio.FIRST_COMPLETED)
print("done", done)
print("pending", pending)
asyncio.get_event_loop().run_until_complete(wait_first())
给出:
done {<Task finished coro=<something_to_wait() done, defined at stack.py:3> result='something_to_wait'>}
pending {<Task pending coro=<something_else_to_wait() running at stack.py:8> wait_for=<Future pending cb=[Task._wakeup()]>>}
Task was destroyed but it is pending!
task: <Task pending coro=<something_else_to_wait() running at stack.py:8> wait_for=<Future pending cb=[Task._wakeup()]>>
在想要对任务应用超时的情况下,有一个标准库函数可以做到这一点:asyncio.wait_for()
。你的例子可以这样写:
try:
result = await asyncio.wait_for(queue.get(), timeout=1)
except asyncio.TimeoutError:
# This block will execute if queue.get() takes more than 1s.
result = ...
但这只适用于超时的特定情况。这里的其他两个答案概括为任意一组任务,但这些答案都没有说明如何清理没有先完成的任务。这就是导致输出中出现 "Task was destroyed but it is pending" 消息的原因。实际上,您应该对那些待处理的任务做些什么。根据您的示例,我假设您不关心其他任务的结果。这是一个 wait_first()
函数的示例,它 return 是第一个完成任务的值并取消剩余任务。
import asyncio, random
async def foo(x):
r = random.random()
print('foo({:d}) sleeping for {:0.3f}'.format(x, r))
await asyncio.sleep(r)
print('foo({:d}) done'.format(x))
return x
async def wait_first(*futures):
''' Return the result of the first future to finish. Cancel the remaining
futures. '''
done, pending = await asyncio.wait(futures,
return_when=asyncio.FIRST_COMPLETED)
gather = asyncio.gather(*pending)
gather.cancel()
try:
await gather
except asyncio.CancelledError:
pass
return done.pop().result()
async def main():
result = await wait_first(foo(1), foo(2))
print('the result is {}'.format(result))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
运行 这个例子:
# export PYTHONASYNCIODEBUG=1
# python3 test.py
foo(1) sleeping for 0.381
foo(2) sleeping for 0.279
foo(2) done
the result is 2
# python3 test.py
foo(1) sleeping for 0.048
foo(2) sleeping for 0.515
foo(1) done
the result is 1
# python3 test.py
foo(1) sleeping for 0.396
foo(2) sleeping for 0.188
foo(2) done
the result is 2
没有关于挂起任务的错误消息,因为每个挂起任务都已正确清理。
在实践中,您可能希望 wait_first()
到 return 未来,而不是未来的结果,否则试图找出哪个未来完成会非常混乱。但在此处的示例中,我 return 编辑了未来的结果,因为它看起来更干净一些。
这是一个更强大的解决方案,它基于处理以下问题的早期示例:
- 使用递归到 return 第一个非空结果(前面的例子将 return 第一个结果,不管是空还是非空)
- Returns 第一个非空结果,即使另一个任务引发异常
- 如果仅 returned 非空结果并引发异常,则引发最后一个异常
- 处理同时完成的多个任务 - 在实践中这种情况非常罕见,但它可能会在单元测试中弹出,其中伪造的异步任务会立即完成。
请注意,由于使用赋值运算符,此示例需要 Python 3.8。
async def wait_first(*tasks):
"""Return the result of first async task to complete with a non-null result"""
# Get first completed task(s)
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
# Tasks MAY complete at same time e.g. in unit tests :)
# Coalesce the first result if present
for task in done:
exception = task.exception()
if exception is None and (result := task.result()):
break
else:
result = None
# Gather remaining tasks without raising exceptions
gather = asyncio.gather(*pending, return_exceptions=True)
# Cancel remaining tasks if result is non-null otherwise await next pending tasks
if result:
gather.cancel()
elif pending:
result = await wait_first(*pending)
# Await remaining tasks to ensure they are cancelled
try:
await gather
except asyncio.CancelledError:
pass
# Return result or raise last exception if no result was returned
if exception and result is None:
raise exception
else:
return result