为 asyncio 制作一个 tqdm 进度条

Making a tqdm progress bar for asyncio

我正在尝试收集异步任务的 tqdm 进度条。

希望进度条在任务完成后逐步更新。试过代码:

import asyncio
import tqdm
import random

async def factorial(name, number):
    f = 1
    for i in range(2, number+1):
        await asyncio.sleep(random.random())
        f *= i
    print(f"Task {name}: factorial {number} = {f}")

async def tq(flen):
    for _ in tqdm.tqdm(range(flen)):
        await asyncio.sleep(0.1)

async def main():
    # Schedule the three concurrently

    flist = [factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4)]

    await asyncio.gather(*flist, tq(len(flist)))

asyncio.run(main())

...但这只是完成了 tqdm 柱,然后处理阶乘。

有没有办法让进度条在每个异步任务完成时移动?

现在,我对 asyncho 不是特别熟悉,尽管我在 python 中使用 tqdm 并在多进程方面取得了一些成功。 对您的代码所做的以下更改似乎会更新进度条并同时打印结果,这可能足以让您入门。

responses = [await f
                 for f in tqdm.tqdm(asyncio.as_completed(flist), total=len(flist))]

以上内容应替换您 main 定义中的 await asyncio.gather(*flist, tq(len(flist)))

更多信息,以上灵感来自

为了只打印一次进度条并更新它,我做了以下操作,它更新了进度条的描述以包含您的消息:

import asyncio
import tqdm


async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        await asyncio.sleep(1)
        f *= i
    return f"Task {name}: factorial {number} = {f}"

async def tq(flen):
    for _ in tqdm.tqdm(range(flen)):
        await asyncio.sleep(0.1)


async def main():
    # Schedule the three concurrently

    flist = [factorial("A", 2),
             factorial("B", 3),
             factorial("C", 4)]

    pbar = tqdm.tqdm(total=len(flist))
    for f in asyncio.as_completed(flist):
        value = await f
        pbar.set_description(value)
        pbar.update()

if __name__ == '__main__':
    asyncio.run(main())

pbar 格式的 Dragos 代码进行了一些小改动,并使用 tqdm.write() 几乎得到了我想要的,如下所示:

import asyncio
import random

import tqdm


async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        await asyncio.sleep(random.random())
        f *= i
    return f"Task {name}: factorial {number} = {f}"

async def tq(flen):
    for _ in tqdm.tqdm(range(flen)):
        await asyncio.sleep(0.1)


async def main():

    flist = [factorial("A", 2),
             factorial("B", 3),
             factorial("C", 4)]

    pbar = tqdm.tqdm(total=len(flist), position=0, ncols=90)
    for f in asyncio.as_completed(flist):
        value = await f
        pbar.set_description(desc=value, refresh=True)
        tqdm.tqdm.write(value)
        pbar.update()

if __name__ == '__main__':
    asyncio.run(main())

tqdm version 4.48.0 开始,可以使用 tqdm.asyncio.tqdm.as_completed()

import tqdm.asyncio
...
for f in tqdm.asyncio.tqdm.as_completed(flist):
    await f

这是一个围绕 TQDM 的异步包装器 return 有序结果:

import asyncio
from typing import Any, Coroutine, Iterable, List, Tuple

from tqdm import tqdm


async def aprogress(tasks: Iterable[Coroutine], **pbar_kws: Any) -> List[Any]:
    """Runs async tasks with a progress bar and returns an ordered result."""

    if not tasks:
        return []

    async def tup(idx: int, task: Coroutine) -> Tuple[int, Any]:
        """Returns the index and result of a task."""
        return idx, await task

    _tasks = [tup(i, t) for i, t in enumerate(tasks)]
    pbar = tqdm(asyncio.as_completed(_tasks), total=len(_tasks), **pbar_kws)
    res = [await t for t in pbar]
    return [r[1] for r in sorted(res, key=lambda r: r[0])]


if __name__ == "__main__":

    import random

    async def test(idx: int) -> Tuple[int, int]:
        sleep = random.randint(0, 5)
        await asyncio.sleep(sleep)
        return idx, sleep

    _tasks = [test(i) for i in range(10)]
    _res = asyncio.run(aprogress(_tasks, desc="pbar test"))
    print(_res)

Full source