异步字符串加入 Python
Asyncify string joining in Python
我有以下代码片段,我想将其转换为异步代码(data
往往是一个大的 Iterable):
transformed_data = (do_some_transformation(d) for d in data)
stacked_jsons = "\n\n".join(json.dumps(t, separators=(",", ":")) for t in transformed_data)
我设法将 do_some_transformation
函数重写为异步函数,因此我可以执行以下操作:
transformed_data = (await do_some_transformation(d) for d in data)
async_generator = (json.dumps(event, separators=(",", ":")) async for t in transformed_data)
stacked_jsons = ???
增量加入异步生成器生成的 json 以便加入过程也是异步的最佳方法是什么?
此代码段是更大的 I/O-bound-application 的一部分,其中包含许多异步组件,因此可以从异步化所有内容中获益。
关于我的评论的更深入的解释:
如果您的处理器有很多等待要做的事情,Asyncio 是一个很好的工具。
例如:当您通过网络向数据库发出请求时,在发送请求后,您的 cpu 在得到答复之前什么都不做。
使用异步等待语法,您可以让处理器在“等待”当前任务完成的同时执行其他任务。这并不意味着 运行 它们是并行的。一次只有一项任务 运行ning。
在你的情况下(据我所知)cpu 从不等待它不断地 运行 宁字符串操作。
如果您想 运行 并行执行这些操作,您可能需要查看 ProcesPools。
这不受单个进程和内核的约束,而是将处理分散到多个内核上以并行 运行 它。
from concurrent.futures import ProcessPoolExecutor
def main():
with ProcessPoolExecutor() as executor:
transformed_data = executor.map(do_some_transformation, data) #returns an iterable
stacked_jsons = "\n\n".join(json.dumps(t, separators=(",", ":")) for t in transformed_data)
if __name__ == '__main__':
main()
希望提供的代码对您有所帮助。
ps。
if __name__
部分是必需的
编辑:我看到你关于 10k 指令的评论,假设你有 8 个内核(忽略多线程)那么每个进程只会转换 1250 个指令,而不是你的主线程现在做的 10k。这些进程 运行 同时进行,虽然性能提升不是线性的,但处理它们的速度应该快得多。
str.join
的要点是一次转换整个列表。1如果项目递增到达,它可以有利于一一积累。
async def join(by: str, _items: 'AsyncIterable[str]') -> str:
"""Asynchronously joins items with some string"""
result = ""
async for item in _items:
if result and by: # only add the separator between items
result += by
result += item
return result
async for
循环足以让异步迭代器在项目之间暂停,以便其他任务可以 运行。这种方法的主要优点是,即使对于非常多的项目,这也不会比添加下一个项目更长时间地停止事件循环。
这个实用程序可以直接消化异步生成器:
stacked_jsons = join("\n\n", (json.dumps(event, separators=(",", ":")) async for t in transformed_data))
当知道数据足够小str.join
运行s 有足够的时间时,可以直接将数据转换为list
并使用str.join
:
stacked_jsons = "\n\n".join([json.dumps(event, separators=(",", ":")) async for t in transformed_data])
[... async for ...]
构造是 asynchronous list comprehension。此 内部 以异步方式迭代,但会在获取所有项目后生成常规 list
– 只有此结果 list
会传递给 str.join
并且可以同步处理。
1 即使加入一个可迭代对象,str.join
也会先在内部将其变成 list
。
TL;DR:考虑使用 producer/consumer 模式,如果 do_some_transformation
是 IO 绑定,并且您 真的 想要增量聚合。
Of course, async itself only brings an advantage if you actually have any other proper async tasks to begin with.
正如@MisterMiyagi 所说,如果 do_some_transformation
受 IO 限制且耗时,那么将所有转换作为一大群异步任务触发可能是个好主意。
示例代码:
import asyncio
import json
data = ({"large": "data"},) * 3 # large
stacked_jsons = ""
async def transform(d: dict, q: asyncio.Queue) -> None:
# `do_some_transformation`: long IO bound task
await asyncio.sleep(1)
await q.put(d)
# WARNING: incremental concatination of string would be slow,
# since string is immutable.
async def join(q: asyncio.Queue):
global stacked_jsons
while True:
d = await q.get()
stacked_jsons += json.dumps(d, separators=(",", ":")) + "\n\n"
q.task_done()
async def main():
q = asyncio.Queue()
producers = [asyncio.create_task(transform(d, q)) for d in data]
consumer = asyncio.create_task(join(q))
await asyncio.gather(*producers)
await q.join() # Implicitly awaits consumers, too
consumer.cancel()
print(stacked_jsons)
if __name__ == "__main__":
import time
s = time.perf_counter()
asyncio.run(main())
elapsed = time.perf_counter() - s
print(f"{__file__} executed in {elapsed:0.2f} seconds.")
这样do_some_transformation
就不会互相屏蔽了。输出:
$ python test.py
{"large":"data"}
{"large":"data"}
{"large":"data"}
test.py executed in 1.00 seconds.
此外,我不认为字符串的增量连接是个好主意,因为字符串是不可变的并且会浪费大量内存;)
我有以下代码片段,我想将其转换为异步代码(data
往往是一个大的 Iterable):
transformed_data = (do_some_transformation(d) for d in data)
stacked_jsons = "\n\n".join(json.dumps(t, separators=(",", ":")) for t in transformed_data)
我设法将 do_some_transformation
函数重写为异步函数,因此我可以执行以下操作:
transformed_data = (await do_some_transformation(d) for d in data)
async_generator = (json.dumps(event, separators=(",", ":")) async for t in transformed_data)
stacked_jsons = ???
增量加入异步生成器生成的 json 以便加入过程也是异步的最佳方法是什么? 此代码段是更大的 I/O-bound-application 的一部分,其中包含许多异步组件,因此可以从异步化所有内容中获益。
关于我的评论的更深入的解释:
如果您的处理器有很多等待要做的事情,Asyncio 是一个很好的工具。 例如:当您通过网络向数据库发出请求时,在发送请求后,您的 cpu 在得到答复之前什么都不做。
使用异步等待语法,您可以让处理器在“等待”当前任务完成的同时执行其他任务。这并不意味着 运行 它们是并行的。一次只有一项任务 运行ning。
在你的情况下(据我所知)cpu 从不等待它不断地 运行 宁字符串操作。
如果您想 运行 并行执行这些操作,您可能需要查看 ProcesPools。 这不受单个进程和内核的约束,而是将处理分散到多个内核上以并行 运行 它。
from concurrent.futures import ProcessPoolExecutor
def main():
with ProcessPoolExecutor() as executor:
transformed_data = executor.map(do_some_transformation, data) #returns an iterable
stacked_jsons = "\n\n".join(json.dumps(t, separators=(",", ":")) for t in transformed_data)
if __name__ == '__main__':
main()
希望提供的代码对您有所帮助。
ps。
if __name__
部分是必需的
编辑:我看到你关于 10k 指令的评论,假设你有 8 个内核(忽略多线程)那么每个进程只会转换 1250 个指令,而不是你的主线程现在做的 10k。这些进程 运行 同时进行,虽然性能提升不是线性的,但处理它们的速度应该快得多。
str.join
的要点是一次转换整个列表。1如果项目递增到达,它可以有利于一一积累。
async def join(by: str, _items: 'AsyncIterable[str]') -> str:
"""Asynchronously joins items with some string"""
result = ""
async for item in _items:
if result and by: # only add the separator between items
result += by
result += item
return result
async for
循环足以让异步迭代器在项目之间暂停,以便其他任务可以 运行。这种方法的主要优点是,即使对于非常多的项目,这也不会比添加下一个项目更长时间地停止事件循环。
这个实用程序可以直接消化异步生成器:
stacked_jsons = join("\n\n", (json.dumps(event, separators=(",", ":")) async for t in transformed_data))
当知道数据足够小str.join
运行s 有足够的时间时,可以直接将数据转换为list
并使用str.join
:
stacked_jsons = "\n\n".join([json.dumps(event, separators=(",", ":")) async for t in transformed_data])
[... async for ...]
构造是 asynchronous list comprehension。此 内部 以异步方式迭代,但会在获取所有项目后生成常规 list
– 只有此结果 list
会传递给 str.join
并且可以同步处理。
1 即使加入一个可迭代对象,str.join
也会先在内部将其变成 list
。
TL;DR:考虑使用 producer/consumer 模式,如果 do_some_transformation
是 IO 绑定,并且您 真的 想要增量聚合。
Of course, async itself only brings an advantage if you actually have any other proper async tasks to begin with.
正如@MisterMiyagi 所说,如果 do_some_transformation
受 IO 限制且耗时,那么将所有转换作为一大群异步任务触发可能是个好主意。
示例代码:
import asyncio
import json
data = ({"large": "data"},) * 3 # large
stacked_jsons = ""
async def transform(d: dict, q: asyncio.Queue) -> None:
# `do_some_transformation`: long IO bound task
await asyncio.sleep(1)
await q.put(d)
# WARNING: incremental concatination of string would be slow,
# since string is immutable.
async def join(q: asyncio.Queue):
global stacked_jsons
while True:
d = await q.get()
stacked_jsons += json.dumps(d, separators=(",", ":")) + "\n\n"
q.task_done()
async def main():
q = asyncio.Queue()
producers = [asyncio.create_task(transform(d, q)) for d in data]
consumer = asyncio.create_task(join(q))
await asyncio.gather(*producers)
await q.join() # Implicitly awaits consumers, too
consumer.cancel()
print(stacked_jsons)
if __name__ == "__main__":
import time
s = time.perf_counter()
asyncio.run(main())
elapsed = time.perf_counter() - s
print(f"{__file__} executed in {elapsed:0.2f} seconds.")
这样do_some_transformation
就不会互相屏蔽了。输出:
$ python test.py
{"large":"data"}
{"large":"data"}
{"large":"data"}
test.py executed in 1.00 seconds.
此外,我不认为字符串的增量连接是个好主意,因为字符串是不可变的并且会浪费大量内存;)