并行化嵌套的大型 `(15e4 * 15e4)` for 循环以获得成对矩阵

Parallelize nested large `(15e4 * 15e4)` for loop to get a pairwise matrix

我正在尝试并行化以下代码,它为每一行创建一个成对的结果。如下图

def get_custom_value(i, j):
    first = df[df['id'] == i]
    second = df[df['id'] == j]

    return int(first['val_1']) * int(second['val_1']) +\
            int(first['val_2']) * int(second['val_2'])

df = pd.DataFrame(
    {
        'id' : range(4),
        'val_1' : [3, 4, 5, 1],
        'val_2' : [2, 3, 1, 1]
    }
)

n = df.shape[0]

result = []

for i in range(n):
    for j in range(i+1, n):
        temp_value = get_custom_value(i, j)
        result.append([i, j, temp_value])
        if len(result) > 1e5:
            # store it in a local file and reset the result object.
            # Assume here some code to write to a local file here.
            result = []

print(result)

我试过什么?下面是代码:代码挂起。没有任何错误。

import itertools
import multiprocessing

paramlist = list(itertools.combinations(df.id, 2))
pool = multiprocessing.Pool(processes = 2)
result  = pool.map(get_custom_value, paramlist)
print(result)

我可以使用 dask 吗?

实际数据有15万多条记录。即最终结果将有 (150,000 * 150,000 * 1/2) pairs/rows。鉴于结果对象的巨大尺寸,我有一个条件,如果满足则存储结果。因此,实际的 result 对象不会超过我的 RAM。

使用的算法效率很低。实际上,df['id'] == idf['id'] == j 都遍历整个 id 列,其中包含 real-world use-case 中的 150_000 项。因此,您的算法在 O(n^3) 时间内运行并执行粗略的 3_375_000_000_000_000 比较,而最佳算法在 O(n^2) 时间内运行。

此外,CPython 循环非常慢,您应该尽可能避免使用它们。按名称获取 Pandas 数据框单元格也非常慢。相反,您可以使用 向量化 Pandas/Numpy 函数 .

此外,输出效率也不高:CPython 列表有点慢(因为动态 reference-counted 对象)并且存储 (i,j) 值会消耗三倍的内存。您可以将结果存储在 矩阵 中。可能是一个稀疏数组,或者在 紧凑 Numpy 数组列表.

此外,更大的数据结构通常更慢。如果您希望非常快速地完成计算,通常需要使其 适合 CPU 缓存 (几个 MiB)。因此,要有效地处理数据帧,您当然需要 compute it in-situ.

下面是一个使用 Numpy 的相对有效的解决方案:

import numpy as np
val_1 = np.ascontiguousarray(df['val_1'].to_numpy())
val_2 = np.ascontiguousarray(df['val_2'].to_numpy())
result = val_1.reshape(-1, 1) * val_1 + val_2.reshape(-1, 1) * val_2

它生成一个 矩阵,其中 (i,j) 项可以使用 result[i, j] 找到。 reshape(-1, 1) 用于转置水平向量以获得垂直向量,然后受益于 Numpy broadcasting。请注意,您可以使用 np.triu(result, 1).

过滤 upper-triangular 部分

您可以逐行生成结果,而不是分配一个巨大的数组:

val_1 = np.ascontiguousarray(df['val_1'].to_numpy())
val_2 = np.ascontiguousarray(df['val_2'].to_numpy())

for i in range(n-1):
    first_val_1 = val_1[i]
    first_val_2 = val_2[i]
    line = first_val_1 * val_1[i+1:] + first_val_2 * val_2[i+1:]

    # Store the line if needed with the i value so to know where it is

如果你真的想从 Numpy 数组行生成一个低效列表,那么你可以使用 np.vstack((np.repeat(i, n-i-1), np.arange(i+1, n), line)).T.tolist() 来实现。但我强烈建议你不要这样做(当然没有必要使用列表)。请注意,您可以使用 np.loadnp.save.

高效地 load/store Numpy 数组

以下是不同方法在我的机器上的性能(使用 i5-9600KF 处理器,2 个 DDR4 通道达到 40 GiB/s 和一个快速的 Nvme SSD,实际上可以在 800 [=71] 上写入大文件=]) 在随机 Pandas 数据帧上,有 15_000 条记录:

Initial code:                 60500    seconds  (estimation)
Numpy matrix:                     0.71 second
Numpy line-by-line:               0.24 second

Time to store all the lines:      0.50 second   (estimation)
in a compact way on my SSD

因此,Numpy line-by-line 解决方案比初始代码快 250_000 倍!所有这一切都无需使用多核。事实上,在这种情况下,使用多核不会快很多,因为 RAM 是一种有限的共享资源,并且文件存储在大多数机器上并行处理的速度并不快(事实上,HDD 更慢 并行使用时,因为它们本质上是顺序的)。如果您真的想这样做,那么使用多处理绝对不是一个好工具。请考虑改用 Numba 或 Cython

是的,您可以为此使用 dask。但是,我建议在直接计算您感兴趣的数量的计算中添加一个缩减(汇总统计 [mean, median, ...],max/min 所有值等)。否则,您希望在 250 GB 二进制数据的范围内编写一些东西,这不是计算时间和磁盘的最有效使用 space(除非您的 use-case 要求它)。

import pandas as pd
import numpy as np
import dask.array as da

# generate fake data of desired size
num_records = int(150000)
df = pd.DataFrame(
    {
        # no need for an ID column, because you have a range index by default
        "val_1": np.random.randint(0, 500, size=num_records),
        "val_2": np.random.randint(0, 500, size=num_records),
    }
)

# index magix to walk over the upper triangle of a fortran contigous array without diagonal
# Note: this is a different order than you use in your for loops, but
# order doesn't matter here.
N = len(df) - 1
flat_index = da.arange(N * (N + 1) // 2, chunks=(1e6,))
idx_j = (da.floor((da.sqrt(1 + 8 * flat_index) - 1)) // 2 + 1).astype(int)
idx_i = flat_index - (idx_j * (idx_j + 1) // 2)


val_1 = df.loc[:, "val_1"].to_numpy().astype(int)
val_2 = df.loc[:, "val_2"].to_numpy().astype(int)
custom_value_vector = da.take(val_1, idx_i) * da.take(val_1, idx_j) + da.take(
    val_2, idx_i
) * da.take(val_2, idx_j)
result_vector = da.stack([idx_i, idx_j, custom_value_vector], axis=-1)
rechunked_result = da.rechunk(result_vector, (5e7, 3))

# now you can reduce to your desired summary statistic
# saving uses da.to_npy_stack
reduced = da.min(rechunked_result)

以上将在几秒钟后 运行,但这只是设置计算图。你仍然需要运行它

from dask.diagnostics import ProgressBar

with ProgressBar():
    value = reduced.compute()
# [########################################] | 100% Completed |  3min  3.8s

如果不能矢量化,您仍然可以使用 dask 进行计算。但是,请注意速度会明显变慢:

import pandas as pd
import numpy as np
import dask.array as da
from dask.diagnostics import ProgressBar


# generate fake data of desired size
num_records = int(150000)
df = pd.DataFrame(
    {
        "id": np.arange(num_records),
        "val_1": np.random.randint(0, 500, size=num_records),
        "val_2": np.random.randint(0, 500, size=num_records),
    }
)

# index magix to walk over the upper triangle of a fortran contigous array without diagonal
# Note: this is a different order than you use in your for loops, but
# order doesn't matter here.
N = len(df) - 1
flat_index = da.arange(N * (N + 1) // 2, chunks=(1e6,))
idx_j = (da.floor((da.sqrt(1 + 8 * flat_index) - 1)) // 2).astype(int)
idx_i = flat_index - (idx_j * (idx_j + 1) // 2)
idx_j = idx_j + 1

def get_custom_value(multi_idx):
    i, j = multi_idx  # I added this
    first = df[df['id'] == i]
    second = df[df['id'] == j]

    return int(first['val_1']) * int(second['val_1']) +\
            int(first['val_2']) * int(second['val_2'])

multi_idx = da.stack((idx_i, idx_j), axis=-1)
custom_value = da.apply_along_axis(get_custom_value, -1, multi_idx, dtype=int, shape=tuple())
result = da.stack([idx_i, idx_j, custom_value])
reduced = da.min(result)


with ProgressBar():
    value = reduced.compute(scheduler="processes")

我没有运行完成这个,所以我不能分享任何时间安排。 (我在大约 10 分钟后停了下来。这里的重点是:它有效,但你需要有足够的耐心。)

让我们将问题重新定义为构建和处理两个数据集(或数据集本身)的笛卡尔积。 在分布式设置中,通常按以下方式解决: 让我们调用我们的数据集 A 和 B

  • 我们在集群上划分更大的数据集
  • 我们向集群中的每台机器广播(复制)较小的数据集
  • 我们在每台机器上计算和处理笛卡尔积。 如果较小的数据集不是“那么小”并且我们不能广播它,我们可以迭代工作,每次广播它的另一个分区。

在我们的例子中,我们有一个数据集,而且它很小。 那么应该做的是:

  • 我们应该通过创建具有足够分区的 dask 数据帧来在集群上对我们的数据帧进行分区。让我们称之为“A_Partitioned”
  • 我们应该在集群上使用分散函数在集群上广播相同的数据集http://distributed.dask.org/en/stable/locality.html我们称之为A_Broadcasted
  • 现在我们可以在 A_Partitioned 上执行 map_partiions,它将在分区和 A_Broadcasted
  • 上执行嵌套循环