如何在异步代码中处理 CPU 绑定任务

How to process a CPU-bound task in async code

我正在做一些需要异步方法的繁重处理。我的方法之一 returns 在将其添加到另一个可等待对象之前需要经过大量处理的字典列表。即

def cpu_bound_task_here(record):
    ```some complicated preprocessing of record```
    return record

经过下面好心人的回答,我的代码现在卡住了。

async def fun():
print("Socket open")
record_count = 0
symbol = obj.symbol.replace("-", "").replace("/", "")
loop = asyncio.get_running_loop()
await obj.send()

while True:
    try:
        records = await obj.receive()
        if not records:
            continue

        record_count += len(records)
        

所以上面的函数所做的,是它的异步流值,并在无限期地推送到 redis 之前做一些繁重的处理。我进行了必要的更改,但现在卡住了。

如输出所示,run_in_executor returns a Future。您需要等待它才能得到结果。

record = await loop.run_in_executor(
    None, something_cpu_bound_task_here, record
)

请注意,something_cpu_bound_task_here 的任何参数都需要传递给 run_in_executor

此外,正如您提到的,这是一项 CPU-bound 任务,您需要确保使用的是 concurrent.futures.ProcessPoolExecutor。除非你在某处调用了 loop.set_default_executor,否则默认是 ThreadPoolExecutor.

的一个实例
with ProcessPoolExecutor() as executor:
    for record in records:
        record = await loop.run_in_executor(
            executor, something_cpu_bound_task_here, record
        )

最后,您的 while 循环实际上是 运行 同步的。在继续处理 records 中的下一项之前,您需要等待未来,然后等待 obj.add。您可能想要稍微重组您的代码并使用类似 gather 的东西来允许一些并发。

async def process_record(record, obj, loop, executor):
    record = await loop.run_in_executor(
        executor, something_cpu_bound_task_here, record
    )
    await obj.add(record)

async def fun():
    loop = asyncio.get_running_loop()
    records = await receive()
    with ProcessPoolExecutor() as executor:
        await asyncio.gather(
            *[process_record(record, obj, loop, executor) for record in records]
        )
        

我不确定如何处理 obj,因为您的示例中没有定义它,但我相信您可以解决这个问题。