如何处理完成时间不同的任务
How to handle tasks with varying completion times
我正在 YouTube 上观看一个关于 asyncio
的视频(a),在某一时刻,提供了如下代码以有效处理多个 HTTP 请求:
# Need an event loop for doing this.
loop = asyncio.get_event_loop()
# Task creation section.
tasks = []
for n in range(1, 50):
tasks.append(loop.create_task(get_html(f"https://example.com/things?id={n}")))
# Task processing section.
for task in tasks:
html = await task
thing = get_thing_from_html(html)
print(f"Thing found: {thing}", flush=True)
我意识到这在所有内容同时运行的意义上是有效的,但让我担心的是这样的情况:
- 第一个任务用了整整一分钟;但是
- 所有其他人都在三秒内完成。
因为任务处理部分按照任务进入列表的顺序等待任务完成,所以在我看来none 将报告为完成,直到第一个完成。
届时,其他早就完成的也会被报告。我的理解正确吗?
如果是这样,处理这种情况的正常方法是什么,以便您在任务完成时立即收到每个任务的完成通知?
(a) 来自 Michael Kennedy 的“Talk Python To Me”播客成名。如果您有兴趣,该视频是 Demystifying Python's Async and Await Keywords。除了享受播客外,我与该网站没有任何关系,因此衷心推荐它。
您可以将 运行 的任务与下面的代码示例并行执行。我引入了 asyncio.gather
并发任务 运行。我还演示了 poison pill
技术和 daemon task
技术。
请关注代码中的注释,如有问题请随时提问。
import asyncio
from random import randint
WORKERS_NUMBER = 5
URL_NUM = 20
async def producer(task_q: asyncio.Queue) -> None:
"""Produce tasks and send them to workers"""
print("Producer-Task Started")
# imagine that it is a list of urls
for i in range(URL_NUM):
await task_q.put(i)
# send poison pill to workers
for i in range(WORKERS_NUMBER):
await task_q.put(None)
print("Producer-Task Finished")
async def results_shower(result_q: asyncio.Queue) -> None:
"""Receives results from worker tasks and show the result"""
while True:
res = await result_q.get()
print(res)
result_q.task_done() # confirm that task is done
async def worker(
name: str,
task_q: asyncio.Queue,
result_q: asyncio.Queue,
) -> None:
"""Get's tasks from task_q, do some job and send results to result_q"""
print(f"Worker {name} Started")
while True:
task = await task_q.get()
# if worker received poison pill - break
if task is None:
break
await asyncio.sleep(randint(1, 10))
result = task ** 2
await result_q.put(result)
print(f"Worker {name} Finished")
async def amain():
"""Wrapper around all async ops in the app"""
_task_q = asyncio.Queue(maxsize=5) # just some random maxsize
_results_q = asyncio.Queue(maxsize=5) # just some random maxsize
# we run results_shower as a "daemon task", so we never await
# if asyncio loop has nothing else to do, loop stops
# without waiting for "daemon task"
asyncio.create_task(results_shower(_results_q))
# gather block means that we run task in parallel and wait till all the task are finished
await asyncio.gather(
producer(_task_q),
*[worker(f"W-{i}", _task_q, _results_q) for i in range(WORKERS_NUMBER)]
)
# q.join() prevents loop from stopping, until results_shower print all task result
# it has some internal counter, which is decreased by task_done and increases
# q.put(). If counter is 0, the q can join.
await _results_q.join()
print("All work is finished!")
if __name__ == '__main__':
asyncio.run(amain())
如果你只需要在每个任务之后做一些事情,你可以创建另一个异步函数来做它,运行 那些并行:
async def wrapped_get_html(url):
html = await get_html(url)
thing = get_thing_from_html(html)
print(f"Thing found: {thing}")
async def main():
# shorthand for creating tasks and awaiting them all
await asyncio.gather(*
[wrapped_get_html(f"https://example.com/things?id={n}")
for n in range(50)])
asyncio.run(main())
如果出于某种原因您需要通知主循环,您可以使用 as_completed
:
async def main():
for next_done in asyncio.as_completed([
get_html(f"https://example.com/things?id={n}")
for n in range(50)]):
html = await next_done
thing = get_thing_from_html(html)
print(f"Thing found: {thing}")
asyncio.run(main())
我正在 YouTube 上观看一个关于 asyncio
的视频(a),在某一时刻,提供了如下代码以有效处理多个 HTTP 请求:
# Need an event loop for doing this.
loop = asyncio.get_event_loop()
# Task creation section.
tasks = []
for n in range(1, 50):
tasks.append(loop.create_task(get_html(f"https://example.com/things?id={n}")))
# Task processing section.
for task in tasks:
html = await task
thing = get_thing_from_html(html)
print(f"Thing found: {thing}", flush=True)
我意识到这在所有内容同时运行的意义上是有效的,但让我担心的是这样的情况:
- 第一个任务用了整整一分钟;但是
- 所有其他人都在三秒内完成。
因为任务处理部分按照任务进入列表的顺序等待任务完成,所以在我看来none 将报告为完成,直到第一个完成。
届时,其他早就完成的也会被报告。我的理解正确吗?
如果是这样,处理这种情况的正常方法是什么,以便您在任务完成时立即收到每个任务的完成通知?
(a) 来自 Michael Kennedy 的“Talk Python To Me”播客成名。如果您有兴趣,该视频是 Demystifying Python's Async and Await Keywords。除了享受播客外,我与该网站没有任何关系,因此衷心推荐它。
您可以将 运行 的任务与下面的代码示例并行执行。我引入了 asyncio.gather
并发任务 运行。我还演示了 poison pill
技术和 daemon task
技术。
请关注代码中的注释,如有问题请随时提问。
import asyncio
from random import randint
WORKERS_NUMBER = 5
URL_NUM = 20
async def producer(task_q: asyncio.Queue) -> None:
"""Produce tasks and send them to workers"""
print("Producer-Task Started")
# imagine that it is a list of urls
for i in range(URL_NUM):
await task_q.put(i)
# send poison pill to workers
for i in range(WORKERS_NUMBER):
await task_q.put(None)
print("Producer-Task Finished")
async def results_shower(result_q: asyncio.Queue) -> None:
"""Receives results from worker tasks and show the result"""
while True:
res = await result_q.get()
print(res)
result_q.task_done() # confirm that task is done
async def worker(
name: str,
task_q: asyncio.Queue,
result_q: asyncio.Queue,
) -> None:
"""Get's tasks from task_q, do some job and send results to result_q"""
print(f"Worker {name} Started")
while True:
task = await task_q.get()
# if worker received poison pill - break
if task is None:
break
await asyncio.sleep(randint(1, 10))
result = task ** 2
await result_q.put(result)
print(f"Worker {name} Finished")
async def amain():
"""Wrapper around all async ops in the app"""
_task_q = asyncio.Queue(maxsize=5) # just some random maxsize
_results_q = asyncio.Queue(maxsize=5) # just some random maxsize
# we run results_shower as a "daemon task", so we never await
# if asyncio loop has nothing else to do, loop stops
# without waiting for "daemon task"
asyncio.create_task(results_shower(_results_q))
# gather block means that we run task in parallel and wait till all the task are finished
await asyncio.gather(
producer(_task_q),
*[worker(f"W-{i}", _task_q, _results_q) for i in range(WORKERS_NUMBER)]
)
# q.join() prevents loop from stopping, until results_shower print all task result
# it has some internal counter, which is decreased by task_done and increases
# q.put(). If counter is 0, the q can join.
await _results_q.join()
print("All work is finished!")
if __name__ == '__main__':
asyncio.run(amain())
如果你只需要在每个任务之后做一些事情,你可以创建另一个异步函数来做它,运行 那些并行:
async def wrapped_get_html(url):
html = await get_html(url)
thing = get_thing_from_html(html)
print(f"Thing found: {thing}")
async def main():
# shorthand for creating tasks and awaiting them all
await asyncio.gather(*
[wrapped_get_html(f"https://example.com/things?id={n}")
for n in range(50)])
asyncio.run(main())
如果出于某种原因您需要通知主循环,您可以使用 as_completed
:
async def main():
for next_done in asyncio.as_completed([
get_html(f"https://example.com/things?id={n}")
for n in range(50)]):
html = await next_done
thing = get_thing_from_html(html)
print(f"Thing found: {thing}")
asyncio.run(main())