numpy 数组并行处理的问题

Issues with parallelizing processing of numpy array

我在尝试加快程序计算时遇到问题。在我的代码的序列化 python 版本中,我正在计算函数 f(x) 的值,它 returns 一个浮点数,用于滑动 NumPy 数组的 windows如下所示:

a = np.array([i for i in range(1, 10000000)]) # Some data here
N = 100
result = []
for i in range(N, len(a)):
    result.append(f(a[i - N:i]))

由于 NumPy 数组非常大并且 f(x) 运行时间很高,我尝试应用多处理来加速我的代码。通过我的研究,我发现 charm4py 可能是一个很好的解决方案,它有一个 Pool 功能,可以将数组分解成块并在生成的进程之间分配工作。我已经实现了 charm4py 的多处理示例,然后将其转换为我的案例:

# Split an array into subarrays for sequential processing (takes only 5 seconds)
a = np.array([a[i - N:i] for i in range(N, len(a))])
result = charm.pool.map(f, a, chunksize=512, ncores=-1)
# I'm running this code through "charmrun +p18 example.py"

我遇到的问题是代码 运行 慢很多,尽管是在更强大的实例上执行的(18 个物理内核与 6 个物理内核)。

我曾期望看到大约 3 倍的改进,但它没有发生。在寻找解决方案时,我了解到昂贵的 deserialization/spinning 新流程会产生一些开销,但我不确定是否属于这种情况。

对于如何实现 NumPy 数组的快速并行处理的任何反馈或建议,我将不胜感激(假设函数 f(x) 未矢量化,需要相当长的时间来计算,并且在内部生成一个大的无法并行化的 specific/individual 个调用数)?

谢谢!

这是一个基于您的代码片段的示例,它使用 Ray 并行化数组计算。

请注意,执行此操作的最佳方法将取决于您的函数 f 的外观。

import numpy as np
import ray
import time

ray.init()

N = 100000

a = np.arange(10**7)
a_id = ray.put(a)

@ray.remote
def f(array, index):
    # Do processing
    time.sleep(0.2)
    return 1

result_ids = []
for i in range(len(a) // N):
    result_ids.append(f.remote(a_id, i))

results = ray.get(result_ids)

听起来您正在尝试将此操作与 Charm 或 Ray 并行化(尚不清楚如何同时使用两者)。

如果您选择使用 Ray,并且您的数据是一个 numpy 数组,您可以利用 zero-copy reads 来避免任何反序列化开销。

您可能想稍微优化一下滑动 window 功能,但它可能看起来像这样:

@ray.remote
def apply_rolling(f, arr, start, end, window_size):
    results_arr = []
    for i in range(start, end - window_size):
        results_arr.append(f(arr[i : i + windows_size])
    return np.array(results_arr)

请注意,此结构允许我们在单个任务(也称为批处理)中多次调用 f

要使用我们的功能:

# Some small setup
big_arr = np.arange(10000000)
big_arr_ref = ray.put(big_arr)

batch_size = len(big_arr) // ray.available_resources()["CPU"]
window_size = 100

# Kick off our tasks
result_refs = []
for i in range(0, big_arr, batch_size):
    end_point = min(i + batch_size, len(big_arr))
    ref = apply_rolling.remote(f, big_arr_ref, i, end_point)
    result_refs.append(ref)


# Handle the results
flattened = []
for section in ray.get(result_refs):
    flattened.extend(section)

我确定您会想要自定义此代码,但这里有 2 个您可能想要维护的重要且不错的属性。

批处理:我们正在利用批处理来避免启动太多任务。在任何系统中,并行化都会产生开销,所以我们总是要小心并确保我们不会启动太多任务。此外,我们正在计算 batch_size = len(big_arr) // ray.available_resources()["CPU"] 以确保我们使用与 CPU 数量完全相同的批处理数。

共享内存:由于 Ray 的对象存储支持从 numpy 数组读取零拷贝,调用 ray.get 或从 numpy 数组读取几乎是免费的(在单个没有网络成本的机器)。但是 在 serializing/calling ray.put 中有一些开销,所以这种方法只调用一次 put(昂贵的操作),并且 ray.get (隐式调用)多次。

提示:将数组作为参数直接传递给远程函数时要小心。它会多次调用 ray.put即使你传递同一个对象