异步字符串加入 Python

Asyncify string joining in Python

我有以下代码片段,我想将其转换为异步代码(data 往往是一个大的 Iterable):

transformed_data = (do_some_transformation(d) for d in data)
stacked_jsons = "\n\n".join(json.dumps(t, separators=(",", ":")) for t in transformed_data)

我设法将 do_some_transformation 函数重写为异步函数,因此我可以执行以下操作:

transformed_data = (await do_some_transformation(d) for d in data)
async_generator = (json.dumps(event, separators=(",", ":")) async for t in transformed_data)
stacked_jsons = ???

增量加入异步生成器生成的 json 以便加入过程也是异步的最佳方法是什么? 此代码段是更大的 I/O-bound-application 的一部分,其中包含许多异步组件,因此可以从异步化所有内容中获益。

关于我的评论的更深入的解释:

如果您的处理器有很多等待要做的事情,Asyncio 是一个很好的工具。 例如:当您通过网络向数据库发出请求时,在发送请求后,您的 cpu 在得到答复之前什么都不做。

使用异步等待语法,您可以让处理器在“等待”当前任务完成的同时执行其他任务。这并不意味着 运行 它们是并行的。一次只有一项任务 运行ning。

在你的情况下(据我所知)cpu 从不等待它不断地 运行 宁字符串操作。

如果您想 运行 并行执行这些操作,您可能需要查看 ProcesPools。 这不受单个进程和内核的约束,而是将处理分散到多个内核上以并行 运行 它。

from concurrent.futures import ProcessPoolExecutor

def main():
    with ProcessPoolExecutor() as executor:
        transformed_data = executor.map(do_some_transformation, data) #returns an iterable

    stacked_jsons = "\n\n".join(json.dumps(t, separators=(",", ":")) for t in transformed_data)

if __name__ == '__main__':
    main()

希望提供的代码对您有所帮助。

ps。 if __name__ 部分是必需的

编辑:我看到你关于 10k 指令的评论,假设你有 8 个内核(忽略多线程)那么每个进程只会转换 1250 个指令,而不是你的主线程现在做的 10k。这些进程 运行 同时进行,虽然性能提升不是线性的,但处理它们的速度应该快得多。

str.join的要点是一次转换整个列表1如果项目递增到达,它可以有利于一一积累。

async def join(by: str, _items: 'AsyncIterable[str]') -> str:
    """Asynchronously joins items with some string"""
    result = ""
    async for item in _items:
        if result and by:  # only add the separator between items
            result += by
        result += item
    return result

async for 循环足以让异步迭代器在项目之间暂停,以便其他任务可以 运行。这种方法的主要优点是,即使对于非常多的项目,这也不会比添加下一个项目更长时间地停止事件循环。

这个实用程序可以直接消化异步生成器:

stacked_jsons = join("\n\n", (json.dumps(event, separators=(",", ":")) async for t in transformed_data))

当知道数据足够小str.join 运行s 有足够的时间时,可以直接将数据转换为list并使用str.join:

stacked_jsons = "\n\n".join([json.dumps(event, separators=(",", ":")) async for t in transformed_data])

[... async for ...] 构造是 asynchronous list comprehension。此 内部 以异步方式迭代,但会在获取所有项目后生成常规 list – 只有此结果 list 会传递给 str.join 并且可以同步处理。


1 即使加入一个可迭代对象,str.join 也会先在内部将其变成 list

TL;DR:考虑使用 producer/consumer 模式,如果 do_some_transformation 是 IO 绑定,并且您 真的 想要增量聚合。

Of course, async itself only brings an advantage if you actually have any other proper async tasks to begin with.

正如@MisterMiyagi 所说,如果 do_some_transformation 受 IO 限制且耗时,那么将所有转换作为一大群异步任务触发可能是个好主意。

示例代码:

import asyncio
import json


data = ({"large": "data"},) * 3  # large
stacked_jsons = ""


async def transform(d: dict, q: asyncio.Queue) -> None:
    # `do_some_transformation`: long IO bound task
    await asyncio.sleep(1)
    await q.put(d)


# WARNING: incremental concatination of string would be slow,
# since string is immutable.
async def join(q: asyncio.Queue):
    global stacked_jsons
    while True:
        d = await q.get()
        stacked_jsons += json.dumps(d, separators=(",", ":")) + "\n\n"
        q.task_done()


async def main():
    q = asyncio.Queue()
    producers = [asyncio.create_task(transform(d, q)) for d in data]
    consumer = asyncio.create_task(join(q))
    await asyncio.gather(*producers)
    await q.join()  # Implicitly awaits consumers, too
    consumer.cancel()
    print(stacked_jsons)


if __name__ == "__main__":
    import time

    s = time.perf_counter()
    asyncio.run(main())
    elapsed = time.perf_counter() - s
    print(f"{__file__} executed in {elapsed:0.2f} seconds.")

这样do_some_transformation就不会互相屏蔽了。输出:

$ python test.py
{"large":"data"}
{"large":"data"}
{"large":"data"}

test.py executed in 1.00 seconds.

此外,我不认为字符串的增量连接是个好主意,因为字符串是不可变的并且会浪费大量内存;)

参考:Async IO in Python: A Complete Walkthrough - Real Python