如何在异步代码中处理 CPU 绑定任务
How to process a CPU-bound task in async code
我正在做一些需要异步方法的繁重处理。我的方法之一 returns 在将其添加到另一个可等待对象之前需要经过大量处理的字典列表。即
def cpu_bound_task_here(record):
```some complicated preprocessing of record```
return record
经过下面好心人的回答,我的代码现在卡住了。
async def fun():
print("Socket open")
record_count = 0
symbol = obj.symbol.replace("-", "").replace("/", "")
loop = asyncio.get_running_loop()
await obj.send()
while True:
try:
records = await obj.receive()
if not records:
continue
record_count += len(records)
所以上面的函数所做的,是它的异步流值,并在无限期地推送到 redis 之前做一些繁重的处理。我进行了必要的更改,但现在卡住了。
如输出所示,run_in_executor
returns a Future
。您需要等待它才能得到结果。
record = await loop.run_in_executor(
None, something_cpu_bound_task_here, record
)
请注意,something_cpu_bound_task_here
的任何参数都需要传递给 run_in_executor
。
此外,正如您提到的,这是一项 CPU-bound 任务,您需要确保使用的是 concurrent.futures.ProcessPoolExecutor
。除非你在某处调用了 loop.set_default_executor
,否则默认是 ThreadPoolExecutor
.
的一个实例
with ProcessPoolExecutor() as executor:
for record in records:
record = await loop.run_in_executor(
executor, something_cpu_bound_task_here, record
)
最后,您的 while 循环实际上是 运行 同步的。在继续处理 records
中的下一项之前,您需要等待未来,然后等待 obj.add
。您可能想要稍微重组您的代码并使用类似 gather
的东西来允许一些并发。
async def process_record(record, obj, loop, executor):
record = await loop.run_in_executor(
executor, something_cpu_bound_task_here, record
)
await obj.add(record)
async def fun():
loop = asyncio.get_running_loop()
records = await receive()
with ProcessPoolExecutor() as executor:
await asyncio.gather(
*[process_record(record, obj, loop, executor) for record in records]
)
我不确定如何处理 obj
,因为您的示例中没有定义它,但我相信您可以解决这个问题。
我正在做一些需要异步方法的繁重处理。我的方法之一 returns 在将其添加到另一个可等待对象之前需要经过大量处理的字典列表。即
def cpu_bound_task_here(record):
```some complicated preprocessing of record```
return record
经过下面好心人的回答,我的代码现在卡住了。
async def fun():
print("Socket open")
record_count = 0
symbol = obj.symbol.replace("-", "").replace("/", "")
loop = asyncio.get_running_loop()
await obj.send()
while True:
try:
records = await obj.receive()
if not records:
continue
record_count += len(records)
所以上面的函数所做的,是它的异步流值,并在无限期地推送到 redis 之前做一些繁重的处理。我进行了必要的更改,但现在卡住了。
如输出所示,run_in_executor
returns a Future
。您需要等待它才能得到结果。
record = await loop.run_in_executor(
None, something_cpu_bound_task_here, record
)
请注意,something_cpu_bound_task_here
的任何参数都需要传递给 run_in_executor
。
此外,正如您提到的,这是一项 CPU-bound 任务,您需要确保使用的是 concurrent.futures.ProcessPoolExecutor
。除非你在某处调用了 loop.set_default_executor
,否则默认是 ThreadPoolExecutor
.
with ProcessPoolExecutor() as executor:
for record in records:
record = await loop.run_in_executor(
executor, something_cpu_bound_task_here, record
)
最后,您的 while 循环实际上是 运行 同步的。在继续处理 records
中的下一项之前,您需要等待未来,然后等待 obj.add
。您可能想要稍微重组您的代码并使用类似 gather
的东西来允许一些并发。
async def process_record(record, obj, loop, executor):
record = await loop.run_in_executor(
executor, something_cpu_bound_task_here, record
)
await obj.add(record)
async def fun():
loop = asyncio.get_running_loop()
records = await receive()
with ProcessPoolExecutor() as executor:
await asyncio.gather(
*[process_record(record, obj, loop, executor) for record in records]
)
我不确定如何处理 obj
,因为您的示例中没有定义它,但我相信您可以解决这个问题。