为 asyncio 制作一个 tqdm 进度条
Making a tqdm progress bar for asyncio
我正在尝试收集异步任务的 tqdm 进度条。
希望进度条在任务完成后逐步更新。试过代码:
import asyncio
import tqdm
import random
async def factorial(name, number):
f = 1
for i in range(2, number+1):
await asyncio.sleep(random.random())
f *= i
print(f"Task {name}: factorial {number} = {f}")
async def tq(flen):
for _ in tqdm.tqdm(range(flen)):
await asyncio.sleep(0.1)
async def main():
# Schedule the three concurrently
flist = [factorial("A", 2),
factorial("B", 3),
factorial("C", 4)]
await asyncio.gather(*flist, tq(len(flist)))
asyncio.run(main())
...但这只是完成了 tqdm 柱,然后处理阶乘。
有没有办法让进度条在每个异步任务完成时移动?
现在,我对 asyncho
不是特别熟悉,尽管我在 python 中使用 tqdm
并在多进程方面取得了一些成功。
对您的代码所做的以下更改似乎会更新进度条并同时打印结果,这可能足以让您入门。
responses = [await f
for f in tqdm.tqdm(asyncio.as_completed(flist), total=len(flist))]
以上内容应替换您 main
定义中的 await asyncio.gather(*flist, tq(len(flist)))
。
更多信息,以上灵感来自
为了只打印一次进度条并更新它,我做了以下操作,它更新了进度条的描述以包含您的消息:
import asyncio
import tqdm
async def factorial(name, number):
f = 1
for i in range(2, number + 1):
await asyncio.sleep(1)
f *= i
return f"Task {name}: factorial {number} = {f}"
async def tq(flen):
for _ in tqdm.tqdm(range(flen)):
await asyncio.sleep(0.1)
async def main():
# Schedule the three concurrently
flist = [factorial("A", 2),
factorial("B", 3),
factorial("C", 4)]
pbar = tqdm.tqdm(total=len(flist))
for f in asyncio.as_completed(flist):
value = await f
pbar.set_description(value)
pbar.update()
if __name__ == '__main__':
asyncio.run(main())
对 pbar
格式的 Dragos 代码进行了一些小改动,并使用 tqdm.write()
几乎得到了我想要的,如下所示:
import asyncio
import random
import tqdm
async def factorial(name, number):
f = 1
for i in range(2, number + 1):
await asyncio.sleep(random.random())
f *= i
return f"Task {name}: factorial {number} = {f}"
async def tq(flen):
for _ in tqdm.tqdm(range(flen)):
await asyncio.sleep(0.1)
async def main():
flist = [factorial("A", 2),
factorial("B", 3),
factorial("C", 4)]
pbar = tqdm.tqdm(total=len(flist), position=0, ncols=90)
for f in asyncio.as_completed(flist):
value = await f
pbar.set_description(desc=value, refresh=True)
tqdm.tqdm.write(value)
pbar.update()
if __name__ == '__main__':
asyncio.run(main())
从 tqdm version 4.48.0 开始,可以使用 tqdm.asyncio.tqdm.as_completed()
import tqdm.asyncio
...
for f in tqdm.asyncio.tqdm.as_completed(flist):
await f
这是一个围绕 TQDM 的异步包装器 return 有序结果:
import asyncio
from typing import Any, Coroutine, Iterable, List, Tuple
from tqdm import tqdm
async def aprogress(tasks: Iterable[Coroutine], **pbar_kws: Any) -> List[Any]:
"""Runs async tasks with a progress bar and returns an ordered result."""
if not tasks:
return []
async def tup(idx: int, task: Coroutine) -> Tuple[int, Any]:
"""Returns the index and result of a task."""
return idx, await task
_tasks = [tup(i, t) for i, t in enumerate(tasks)]
pbar = tqdm(asyncio.as_completed(_tasks), total=len(_tasks), **pbar_kws)
res = [await t for t in pbar]
return [r[1] for r in sorted(res, key=lambda r: r[0])]
if __name__ == "__main__":
import random
async def test(idx: int) -> Tuple[int, int]:
sleep = random.randint(0, 5)
await asyncio.sleep(sleep)
return idx, sleep
_tasks = [test(i) for i in range(10)]
_res = asyncio.run(aprogress(_tasks, desc="pbar test"))
print(_res)
我正在尝试收集异步任务的 tqdm 进度条。
希望进度条在任务完成后逐步更新。试过代码:
import asyncio
import tqdm
import random
async def factorial(name, number):
f = 1
for i in range(2, number+1):
await asyncio.sleep(random.random())
f *= i
print(f"Task {name}: factorial {number} = {f}")
async def tq(flen):
for _ in tqdm.tqdm(range(flen)):
await asyncio.sleep(0.1)
async def main():
# Schedule the three concurrently
flist = [factorial("A", 2),
factorial("B", 3),
factorial("C", 4)]
await asyncio.gather(*flist, tq(len(flist)))
asyncio.run(main())
...但这只是完成了 tqdm 柱,然后处理阶乘。
有没有办法让进度条在每个异步任务完成时移动?
现在,我对 asyncho
不是特别熟悉,尽管我在 python 中使用 tqdm
并在多进程方面取得了一些成功。
对您的代码所做的以下更改似乎会更新进度条并同时打印结果,这可能足以让您入门。
responses = [await f
for f in tqdm.tqdm(asyncio.as_completed(flist), total=len(flist))]
以上内容应替换您 main
定义中的 await asyncio.gather(*flist, tq(len(flist)))
。
更多信息,以上灵感来自
为了只打印一次进度条并更新它,我做了以下操作,它更新了进度条的描述以包含您的消息:
import asyncio
import tqdm
async def factorial(name, number):
f = 1
for i in range(2, number + 1):
await asyncio.sleep(1)
f *= i
return f"Task {name}: factorial {number} = {f}"
async def tq(flen):
for _ in tqdm.tqdm(range(flen)):
await asyncio.sleep(0.1)
async def main():
# Schedule the three concurrently
flist = [factorial("A", 2),
factorial("B", 3),
factorial("C", 4)]
pbar = tqdm.tqdm(total=len(flist))
for f in asyncio.as_completed(flist):
value = await f
pbar.set_description(value)
pbar.update()
if __name__ == '__main__':
asyncio.run(main())
对 pbar
格式的 Dragos 代码进行了一些小改动,并使用 tqdm.write()
几乎得到了我想要的,如下所示:
import asyncio
import random
import tqdm
async def factorial(name, number):
f = 1
for i in range(2, number + 1):
await asyncio.sleep(random.random())
f *= i
return f"Task {name}: factorial {number} = {f}"
async def tq(flen):
for _ in tqdm.tqdm(range(flen)):
await asyncio.sleep(0.1)
async def main():
flist = [factorial("A", 2),
factorial("B", 3),
factorial("C", 4)]
pbar = tqdm.tqdm(total=len(flist), position=0, ncols=90)
for f in asyncio.as_completed(flist):
value = await f
pbar.set_description(desc=value, refresh=True)
tqdm.tqdm.write(value)
pbar.update()
if __name__ == '__main__':
asyncio.run(main())
从 tqdm version 4.48.0 开始,可以使用 tqdm.asyncio.tqdm.as_completed()
import tqdm.asyncio
...
for f in tqdm.asyncio.tqdm.as_completed(flist):
await f
这是一个围绕 TQDM 的异步包装器 return 有序结果:
import asyncio
from typing import Any, Coroutine, Iterable, List, Tuple
from tqdm import tqdm
async def aprogress(tasks: Iterable[Coroutine], **pbar_kws: Any) -> List[Any]:
"""Runs async tasks with a progress bar and returns an ordered result."""
if not tasks:
return []
async def tup(idx: int, task: Coroutine) -> Tuple[int, Any]:
"""Returns the index and result of a task."""
return idx, await task
_tasks = [tup(i, t) for i, t in enumerate(tasks)]
pbar = tqdm(asyncio.as_completed(_tasks), total=len(_tasks), **pbar_kws)
res = [await t for t in pbar]
return [r[1] for r in sorted(res, key=lambda r: r[0])]
if __name__ == "__main__":
import random
async def test(idx: int) -> Tuple[int, int]:
sleep = random.randint(0, 5)
await asyncio.sleep(sleep)
return idx, sleep
_tasks = [test(i) for i in range(10)]
_res = asyncio.run(aprogress(_tasks, desc="pbar test"))
print(_res)