Python Asyncio/Trio 用于异步 Computing/Fetching

Python Asyncio/Trio for Asynchronous Computing/Fetching

我正在寻找一种方法来有效地从磁盘中获取一大块值,然后在该大块上执行 computation/calculations。我的想法是一个 for 循环,它首先 运行 磁盘获取任务,然后 运行 对获取的数据进行计算。我想让我的程序获取下一批数据,因为它正在计算 运行,这样我就不必在每次计算完成时都等待另一次数据获取。我预计计算将花费比从磁盘获取数据更长的时间,并且可能无法真正并行完成,因为单个计算任务已经将 cpu 使用率固定在接近 100%。

我在下面 python 中使用 trio 提供了一些代码(但也可以与 asyncio 一起使用以达到相同的效果)来说明我使用异步编程执行此操作的最佳尝试:

import trio
import numpy as np
from datetime import datetime as dt
import time

testiters=10
dim = 6000


def generateMat(arrlen):
    for _ in range(30):
        retval= np.random.rand(arrlen, arrlen)
    # print("matrix generated")
    return retval

def computeOpertion(matrix):
    return np.linalg.inv(matrix)


def runSync():
    for _ in range(testiters):
        mat=generateMat(dim)
        result=computeOpertion(mat)
    return result

async def matGenerator_Async(count):
    for _ in range(count):
        yield generateMat(dim)

async def computeOpertion_Async(matrix):
    return computeOpertion(matrix)

async def runAsync():
    async with trio.open_nursery() as nursery:
        async for value in matGenerator_Async(testiters): 
            nursery.start_soon(computeOpertion_Async,value)
            #await computeOpertion_Async(value)

            

print("Sync:")
start=dt.now()
runSync()
print(dt.now()-start)

print("Async:")
start=dt.now()
trio.run(runAsync)
print(dt.now()-start)

此代码将通过生成 30 个随机矩阵来模拟从磁盘获取数据,其中使用了少量 cpu。然后它将对生成的矩阵执行矩阵求逆,该矩阵使用 100% cpu(在 numpy 中使用 openblas/mkl 配置)。我通过对同步和异步操作进行计时来比较 运行 任务所花费的时间。

据我所知,这两个作业完成的时间完全相同,这意味着异步操作并没有加快执行速度。观察每个计算的行为,顺序操作 运行 是按顺序获取和计算,异步操作 运行 先是所有获取,然后是所有计算。

有没有办法使用异步获取和计算?也许与期货或诸如 gather() 之类的东西? Asyncio 具有这些功能,而 trio 将它们放在单独的包中 trio_future。我也对通过其他方法(线程和多处理)的解决方案持开放态度。

我相信可能存在一个多处理解决方案,可以在单独的进程中进行磁盘读取操作 运行。但是,进程间通信和阻塞会变得很麻烦,因为由于内存限制,我需要某种信号量来控制一次可以生成多少块,而多处理往往非常繁重和缓慢。

编辑

感谢 VPfB 的回答。我无法在操作中 sleep(0) ,但我认为即使我这样做了,它也必然会阻止计算以支持执行磁盘操作。我认为这可能是 python 线程和异步的硬限制,它一次只能执行 1 个线程。 运行 两个不同的进程同时进行是不可能的,如果它们都需要等待一些外部资源从您的 CPU 响应。

也许有一个 执行器 用于多处理池的方法。我在下面添加了以下代码:

import asyncio
import concurrent.futures

async def asynciorunAsync():
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:    
         async for value in matGenerator_Async(testiters):              
            result = await loop.run_in_executor(pool, computeOpertion,value)


print("Async with PoolExecutor:")
start=dt.now()
asyncio.run(asynciorunAsync())
print(dt.now()-start)

尽管计时,它仍然需要与同步示例相同的时间。我想我将不得不采用更复杂的解决方案,因为 async 和 await 似乎是一种过于粗糙的工具,无法正确执行此类任务切换。

我不使用 trio,我的回答是基于 asyncio。

在这些情况下,我认为提高异步性能的唯一方法是将计算分成更小的部分并在它们之间插入 await sleep(0)。这将允许数据获取任务 运行.

Asyncio 使用协同调度。同步 CPU 绑定例程不合作,它在 运行ning 时阻止其他所有内容。

sleep() always suspends the current task, allowing other tasks to run.

Setting the delay to 0 provides an optimized path to allow other tasks to run. This can be used by long-running functions to avoid blocking the event loop for the full duration of the function call.

(引自:asyncio.sleep


如果这不可能,请尝试 运行 executor 中的计算。这为纯异步代码添加了一些多线程功能。

async I/O 的要点是让编写程序变得容易,其中有很多网络 I/O 但实际计算(或磁盘 I/O 很少)。这适用于任何异步库(Trio 或 asyncio)甚至不同的语言(例如 C++ 中的 ASIO)。因此,您的程序在理想情况下 不适合 异步 I/O!您将需要使用多个线程(或进程)。虽然,公平地说,异步 I/O 包括 Trio 可用于协调线程上的工作,并且在您的情况下可能效果很好。

正如 VPfB 的回答所说,如果您使用的是 asyncio,那么您可以使用执行器,特别是 Trio 文档中的 ThreadPoolExecutor passed to loop.run_in_executor(). For Trio, the equivalent would be trio.to_thread.run_sync() (see also Threads (if you must),这更易于使用。在这两种情况下,您都可以 await 结果,因此该函数 运行 在单独的线程中运行,而主 Trio 线程可以继续 运行 运行您的异步代码。您的代码最终看起来像这样:

async def matGenerator_Async(count):
    for _ in range(count):
        yield await trio.to_thread.run_sync(generateMat, dim)

async def my_trio_main()
    async with trio.open_nursery() as nursery:
        async for matrix in matGenerator_Async(testiters):
             nursery.start_soon(trio.to_thread.run_sync, computeOperation, matrix)

trio.run(my_trio_main)

计算函数(generateMatcomputeOperation)不需要异步。事实上,如果它们是有问题的,因为你不能再 运行 它们在一个单独的线程中。一般来说,只有在需要 await 或使用 async withasync for.

时才创建函数 async

从上面的例子可以看出如何将数据传递给另一个线程中的函数运行ning:只需将它们作为参数传递给trio.to_thread.run_sync(),它们将作为参数传递到功能。从 generateMat() 获取结果也很简单 - 在另一个线程中调用的函数的 return 值是 return 从 await trio.to_thread.run_sync() 编辑的。获取 computeOperation() 的结果比较棘手,因为它是在 nursery 中调用的,所以它的 return 值被丢弃了。您需要向它传递一个可变参数(如 dict)并将结果存储在那里。但是要注意线程安全;最简单的方法是将一个新对象传递给每个协程,并且只有在 nursery 完成后才检查它们。

您可能会忽略的最后几个脚注:

  • 需要说明的是,上面代码中的 yield await 不是某种特殊语法。它只是 await foo(),一旦 foo() 完成,return 就是一个值,然后是该值的 yield
  • 您可以更改 Trio 用于调用 to_thread.run_sync() 的线程数,方法是传递一个 CapacityLimiter object,或者找到默认线程并设置它的计数。默认值目前似乎是 40,因此您可能想将其调低一点,但这可能不太重要。
  • 有一个普遍的误解是Python不支持线程,或者至少不能同时在多个线程中进行计算,因为它只有一个全局锁(全局解释器锁,或GIL ).这意味着您需要使用多个进程,而不是线程,让您的程序真正并行计算事物。 Python 中确实有一个 GIL,但只要你使用像 numpy 这样的东西进行计算,那么它就不会阻止多线程有效地工作。
  • Trio 实际上对 async file I/O 有很好的支持。但我认为这对您的情况没有帮助。

为了补充我的其他答案(如您所问的那样使用 Trio),这里介绍了如何使用线程而不使用任何异步库。使用 Future objects and a ThreadPoolExecutor.

执行此操作的最简单方法
futures = []
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    for matrix in matGenerator(testiters):
        futures.append(executor.submit(computeOperation, matrix))
results = [f.result() for f in futures]

该代码实际上与异步代码非常相似,但更简单。如果你不需要做网络I/O,你最好用这个方法。

我认为使用多处理并没有看到任何改进的主要问题是 CPU 的 100% 利用率。它实质上给您留下了 async-like 行为,其中资源偶尔会被释放并用于 I/O 进程。您可以为您的 ProcessPoolExecutor 设置工人数量限制,这可能允许 I/O 它需要更多准备好的空间。

免责声明:我对多处理和线程仍然是新手。