并行化嵌套的大型 `(15e4 * 15e4)` for 循环以获得成对矩阵
Parallelize nested large `(15e4 * 15e4)` for loop to get a pairwise matrix
我正在尝试并行化以下代码,它为每一行创建一个成对的结果。如下图
def get_custom_value(i, j):
first = df[df['id'] == i]
second = df[df['id'] == j]
return int(first['val_1']) * int(second['val_1']) +\
int(first['val_2']) * int(second['val_2'])
df = pd.DataFrame(
{
'id' : range(4),
'val_1' : [3, 4, 5, 1],
'val_2' : [2, 3, 1, 1]
}
)
n = df.shape[0]
result = []
for i in range(n):
for j in range(i+1, n):
temp_value = get_custom_value(i, j)
result.append([i, j, temp_value])
if len(result) > 1e5:
# store it in a local file and reset the result object.
# Assume here some code to write to a local file here.
result = []
print(result)
我试过什么?下面是代码:代码挂起。没有任何错误。
import itertools
import multiprocessing
paramlist = list(itertools.combinations(df.id, 2))
pool = multiprocessing.Pool(processes = 2)
result = pool.map(get_custom_value, paramlist)
print(result)
我可以使用 dask
吗?
实际数据有15万多条记录。即最终结果将有 (150,000 * 150,000 * 1/2) pairs/rows。鉴于结果对象的巨大尺寸,我有一个条件,如果满足则存储结果。因此,实际的 result
对象不会超过我的 RAM。
使用的算法效率很低。实际上,df['id'] == i
和 df['id'] == j
都遍历整个 id
列,其中包含 real-world use-case 中的 150_000 项。因此,您的算法在 O(n^3)
时间内运行并执行粗略的 3_375_000_000_000_000 比较,而最佳算法在 O(n^2)
时间内运行。
此外,CPython 循环非常慢,您应该尽可能避免使用它们。按名称获取 Pandas 数据框单元格也非常慢。相反,您可以使用 向量化 Pandas/Numpy 函数 .
此外,输出效率也不高:CPython 列表有点慢(因为动态 reference-counted 对象)并且存储 (i,j)
值会消耗三倍的内存。您可以将结果存储在 矩阵 中。可能是一个稀疏数组,或者在 紧凑 Numpy 数组列表.
中
此外,更大的数据结构通常更慢。如果您希望非常快速地完成计算,通常需要使其 适合 CPU 缓存 (几个 MiB)。因此,要有效地处理数据帧,您当然需要 compute it in-situ.
下面是一个使用 Numpy 的相对有效的解决方案:
import numpy as np
val_1 = np.ascontiguousarray(df['val_1'].to_numpy())
val_2 = np.ascontiguousarray(df['val_2'].to_numpy())
result = val_1.reshape(-1, 1) * val_1 + val_2.reshape(-1, 1) * val_2
它生成一个 n²
矩阵,其中 (i,j) 项可以使用 result[i, j]
找到。 reshape(-1, 1)
用于转置水平向量以获得垂直向量,然后受益于 Numpy broadcasting。请注意,您可以使用 np.triu(result, 1)
.
过滤 upper-triangular 部分
您可以逐行生成结果,而不是分配一个巨大的数组:
val_1 = np.ascontiguousarray(df['val_1'].to_numpy())
val_2 = np.ascontiguousarray(df['val_2'].to_numpy())
for i in range(n-1):
first_val_1 = val_1[i]
first_val_2 = val_2[i]
line = first_val_1 * val_1[i+1:] + first_val_2 * val_2[i+1:]
# Store the line if needed with the i value so to know where it is
如果你真的想从 Numpy 数组行生成一个低效列表,那么你可以使用 np.vstack((np.repeat(i, n-i-1), np.arange(i+1, n), line)).T.tolist()
来实现。但我强烈建议你不要这样做(当然没有必要使用列表)。请注意,您可以使用 np.load
和 np.save
.
高效地 load/store Numpy 数组
以下是不同方法在我的机器上的性能(使用 i5-9600KF 处理器,2 个 DDR4 通道达到 40 GiB/s 和一个快速的 Nvme SSD,实际上可以在 800 [=71] 上写入大文件=]) 在随机 Pandas 数据帧上,有 15_000 条记录:
Initial code: 60500 seconds (estimation)
Numpy matrix: 0.71 second
Numpy line-by-line: 0.24 second
Time to store all the lines: 0.50 second (estimation)
in a compact way on my SSD
因此,Numpy line-by-line 解决方案比初始代码快 250_000 倍!所有这一切都无需使用多核。事实上,在这种情况下,使用多核不会快很多,因为 RAM 是一种有限的共享资源,并且文件存储在大多数机器上并行处理的速度并不快(事实上,HDD 更慢 并行使用时,因为它们本质上是顺序的)。如果您真的想这样做,那么使用多处理绝对不是一个好工具。请考虑改用 Numba 或 Cython。
是的,您可以为此使用 dask。但是,我建议在直接计算您感兴趣的数量的计算中添加一个缩减(汇总统计 [mean, median, ...],max/min 所有值等)。否则,您希望在 250 GB 二进制数据的范围内编写一些东西,这不是计算时间和磁盘的最有效使用 space(除非您的 use-case 要求它)。
import pandas as pd
import numpy as np
import dask.array as da
# generate fake data of desired size
num_records = int(150000)
df = pd.DataFrame(
{
# no need for an ID column, because you have a range index by default
"val_1": np.random.randint(0, 500, size=num_records),
"val_2": np.random.randint(0, 500, size=num_records),
}
)
# index magix to walk over the upper triangle of a fortran contigous array without diagonal
# Note: this is a different order than you use in your for loops, but
# order doesn't matter here.
N = len(df) - 1
flat_index = da.arange(N * (N + 1) // 2, chunks=(1e6,))
idx_j = (da.floor((da.sqrt(1 + 8 * flat_index) - 1)) // 2 + 1).astype(int)
idx_i = flat_index - (idx_j * (idx_j + 1) // 2)
val_1 = df.loc[:, "val_1"].to_numpy().astype(int)
val_2 = df.loc[:, "val_2"].to_numpy().astype(int)
custom_value_vector = da.take(val_1, idx_i) * da.take(val_1, idx_j) + da.take(
val_2, idx_i
) * da.take(val_2, idx_j)
result_vector = da.stack([idx_i, idx_j, custom_value_vector], axis=-1)
rechunked_result = da.rechunk(result_vector, (5e7, 3))
# now you can reduce to your desired summary statistic
# saving uses da.to_npy_stack
reduced = da.min(rechunked_result)
以上将在几秒钟后 运行,但这只是设置计算图。你仍然需要运行它
from dask.diagnostics import ProgressBar
with ProgressBar():
value = reduced.compute()
# [########################################] | 100% Completed | 3min 3.8s
如果不能矢量化,您仍然可以使用 dask 进行计算。但是,请注意速度会明显变慢:
import pandas as pd
import numpy as np
import dask.array as da
from dask.diagnostics import ProgressBar
# generate fake data of desired size
num_records = int(150000)
df = pd.DataFrame(
{
"id": np.arange(num_records),
"val_1": np.random.randint(0, 500, size=num_records),
"val_2": np.random.randint(0, 500, size=num_records),
}
)
# index magix to walk over the upper triangle of a fortran contigous array without diagonal
# Note: this is a different order than you use in your for loops, but
# order doesn't matter here.
N = len(df) - 1
flat_index = da.arange(N * (N + 1) // 2, chunks=(1e6,))
idx_j = (da.floor((da.sqrt(1 + 8 * flat_index) - 1)) // 2).astype(int)
idx_i = flat_index - (idx_j * (idx_j + 1) // 2)
idx_j = idx_j + 1
def get_custom_value(multi_idx):
i, j = multi_idx # I added this
first = df[df['id'] == i]
second = df[df['id'] == j]
return int(first['val_1']) * int(second['val_1']) +\
int(first['val_2']) * int(second['val_2'])
multi_idx = da.stack((idx_i, idx_j), axis=-1)
custom_value = da.apply_along_axis(get_custom_value, -1, multi_idx, dtype=int, shape=tuple())
result = da.stack([idx_i, idx_j, custom_value])
reduced = da.min(result)
with ProgressBar():
value = reduced.compute(scheduler="processes")
我没有运行完成这个,所以我不能分享任何时间安排。 (我在大约 10 分钟后停了下来。这里的重点是:它有效,但你需要有足够的耐心。)
让我们将问题重新定义为构建和处理两个数据集(或数据集本身)的笛卡尔积。
在分布式设置中,通常按以下方式解决:
让我们调用我们的数据集 A 和 B
- 我们在集群上划分更大的数据集
- 我们向集群中的每台机器广播(复制)较小的数据集
- 我们在每台机器上计算和处理笛卡尔积。
如果较小的数据集不是“那么小”并且我们不能广播它,我们可以迭代工作,每次广播它的另一个分区。
在我们的例子中,我们有一个数据集,而且它很小。
那么应该做的是:
- 我们应该通过创建具有足够分区的 dask 数据帧来在集群上对我们的数据帧进行分区。让我们称之为“A_Partitioned”
- 我们应该在集群上使用分散函数在集群上广播相同的数据集http://distributed.dask.org/en/stable/locality.html我们称之为A_Broadcasted
- 现在我们可以在 A_Partitioned 上执行 map_partiions,它将在分区和 A_Broadcasted
上执行嵌套循环
我正在尝试并行化以下代码,它为每一行创建一个成对的结果。如下图
def get_custom_value(i, j):
first = df[df['id'] == i]
second = df[df['id'] == j]
return int(first['val_1']) * int(second['val_1']) +\
int(first['val_2']) * int(second['val_2'])
df = pd.DataFrame(
{
'id' : range(4),
'val_1' : [3, 4, 5, 1],
'val_2' : [2, 3, 1, 1]
}
)
n = df.shape[0]
result = []
for i in range(n):
for j in range(i+1, n):
temp_value = get_custom_value(i, j)
result.append([i, j, temp_value])
if len(result) > 1e5:
# store it in a local file and reset the result object.
# Assume here some code to write to a local file here.
result = []
print(result)
我试过什么?下面是代码:代码挂起。没有任何错误。
import itertools
import multiprocessing
paramlist = list(itertools.combinations(df.id, 2))
pool = multiprocessing.Pool(processes = 2)
result = pool.map(get_custom_value, paramlist)
print(result)
我可以使用 dask
吗?
实际数据有15万多条记录。即最终结果将有 (150,000 * 150,000 * 1/2) pairs/rows。鉴于结果对象的巨大尺寸,我有一个条件,如果满足则存储结果。因此,实际的 result
对象不会超过我的 RAM。
使用的算法效率很低。实际上,df['id'] == i
和 df['id'] == j
都遍历整个 id
列,其中包含 real-world use-case 中的 150_000 项。因此,您的算法在 O(n^3)
时间内运行并执行粗略的 3_375_000_000_000_000 比较,而最佳算法在 O(n^2)
时间内运行。
此外,CPython 循环非常慢,您应该尽可能避免使用它们。按名称获取 Pandas 数据框单元格也非常慢。相反,您可以使用 向量化 Pandas/Numpy 函数 .
此外,输出效率也不高:CPython 列表有点慢(因为动态 reference-counted 对象)并且存储 (i,j)
值会消耗三倍的内存。您可以将结果存储在 矩阵 中。可能是一个稀疏数组,或者在 紧凑 Numpy 数组列表.
此外,更大的数据结构通常更慢。如果您希望非常快速地完成计算,通常需要使其 适合 CPU 缓存 (几个 MiB)。因此,要有效地处理数据帧,您当然需要 compute it in-situ.
下面是一个使用 Numpy 的相对有效的解决方案:
import numpy as np
val_1 = np.ascontiguousarray(df['val_1'].to_numpy())
val_2 = np.ascontiguousarray(df['val_2'].to_numpy())
result = val_1.reshape(-1, 1) * val_1 + val_2.reshape(-1, 1) * val_2
它生成一个 n²
矩阵,其中 (i,j) 项可以使用 result[i, j]
找到。 reshape(-1, 1)
用于转置水平向量以获得垂直向量,然后受益于 Numpy broadcasting。请注意,您可以使用 np.triu(result, 1)
.
您可以逐行生成结果,而不是分配一个巨大的数组:
val_1 = np.ascontiguousarray(df['val_1'].to_numpy())
val_2 = np.ascontiguousarray(df['val_2'].to_numpy())
for i in range(n-1):
first_val_1 = val_1[i]
first_val_2 = val_2[i]
line = first_val_1 * val_1[i+1:] + first_val_2 * val_2[i+1:]
# Store the line if needed with the i value so to know where it is
如果你真的想从 Numpy 数组行生成一个低效列表,那么你可以使用 np.vstack((np.repeat(i, n-i-1), np.arange(i+1, n), line)).T.tolist()
来实现。但我强烈建议你不要这样做(当然没有必要使用列表)。请注意,您可以使用 np.load
和 np.save
.
以下是不同方法在我的机器上的性能(使用 i5-9600KF 处理器,2 个 DDR4 通道达到 40 GiB/s 和一个快速的 Nvme SSD,实际上可以在 800 [=71] 上写入大文件=]) 在随机 Pandas 数据帧上,有 15_000 条记录:
Initial code: 60500 seconds (estimation)
Numpy matrix: 0.71 second
Numpy line-by-line: 0.24 second
Time to store all the lines: 0.50 second (estimation)
in a compact way on my SSD
因此,Numpy line-by-line 解决方案比初始代码快 250_000 倍!所有这一切都无需使用多核。事实上,在这种情况下,使用多核不会快很多,因为 RAM 是一种有限的共享资源,并且文件存储在大多数机器上并行处理的速度并不快(事实上,HDD 更慢 并行使用时,因为它们本质上是顺序的)。如果您真的想这样做,那么使用多处理绝对不是一个好工具。请考虑改用 Numba 或 Cython。
是的,您可以为此使用 dask。但是,我建议在直接计算您感兴趣的数量的计算中添加一个缩减(汇总统计 [mean, median, ...],max/min 所有值等)。否则,您希望在 250 GB 二进制数据的范围内编写一些东西,这不是计算时间和磁盘的最有效使用 space(除非您的 use-case 要求它)。
import pandas as pd
import numpy as np
import dask.array as da
# generate fake data of desired size
num_records = int(150000)
df = pd.DataFrame(
{
# no need for an ID column, because you have a range index by default
"val_1": np.random.randint(0, 500, size=num_records),
"val_2": np.random.randint(0, 500, size=num_records),
}
)
# index magix to walk over the upper triangle of a fortran contigous array without diagonal
# Note: this is a different order than you use in your for loops, but
# order doesn't matter here.
N = len(df) - 1
flat_index = da.arange(N * (N + 1) // 2, chunks=(1e6,))
idx_j = (da.floor((da.sqrt(1 + 8 * flat_index) - 1)) // 2 + 1).astype(int)
idx_i = flat_index - (idx_j * (idx_j + 1) // 2)
val_1 = df.loc[:, "val_1"].to_numpy().astype(int)
val_2 = df.loc[:, "val_2"].to_numpy().astype(int)
custom_value_vector = da.take(val_1, idx_i) * da.take(val_1, idx_j) + da.take(
val_2, idx_i
) * da.take(val_2, idx_j)
result_vector = da.stack([idx_i, idx_j, custom_value_vector], axis=-1)
rechunked_result = da.rechunk(result_vector, (5e7, 3))
# now you can reduce to your desired summary statistic
# saving uses da.to_npy_stack
reduced = da.min(rechunked_result)
以上将在几秒钟后 运行,但这只是设置计算图。你仍然需要运行它
from dask.diagnostics import ProgressBar
with ProgressBar():
value = reduced.compute()
# [########################################] | 100% Completed | 3min 3.8s
如果不能矢量化,您仍然可以使用 dask 进行计算。但是,请注意速度会明显变慢:
import pandas as pd
import numpy as np
import dask.array as da
from dask.diagnostics import ProgressBar
# generate fake data of desired size
num_records = int(150000)
df = pd.DataFrame(
{
"id": np.arange(num_records),
"val_1": np.random.randint(0, 500, size=num_records),
"val_2": np.random.randint(0, 500, size=num_records),
}
)
# index magix to walk over the upper triangle of a fortran contigous array without diagonal
# Note: this is a different order than you use in your for loops, but
# order doesn't matter here.
N = len(df) - 1
flat_index = da.arange(N * (N + 1) // 2, chunks=(1e6,))
idx_j = (da.floor((da.sqrt(1 + 8 * flat_index) - 1)) // 2).astype(int)
idx_i = flat_index - (idx_j * (idx_j + 1) // 2)
idx_j = idx_j + 1
def get_custom_value(multi_idx):
i, j = multi_idx # I added this
first = df[df['id'] == i]
second = df[df['id'] == j]
return int(first['val_1']) * int(second['val_1']) +\
int(first['val_2']) * int(second['val_2'])
multi_idx = da.stack((idx_i, idx_j), axis=-1)
custom_value = da.apply_along_axis(get_custom_value, -1, multi_idx, dtype=int, shape=tuple())
result = da.stack([idx_i, idx_j, custom_value])
reduced = da.min(result)
with ProgressBar():
value = reduced.compute(scheduler="processes")
我没有运行完成这个,所以我不能分享任何时间安排。 (我在大约 10 分钟后停了下来。这里的重点是:它有效,但你需要有足够的耐心。)
让我们将问题重新定义为构建和处理两个数据集(或数据集本身)的笛卡尔积。 在分布式设置中,通常按以下方式解决: 让我们调用我们的数据集 A 和 B
- 我们在集群上划分更大的数据集
- 我们向集群中的每台机器广播(复制)较小的数据集
- 我们在每台机器上计算和处理笛卡尔积。 如果较小的数据集不是“那么小”并且我们不能广播它,我们可以迭代工作,每次广播它的另一个分区。
在我们的例子中,我们有一个数据集,而且它很小。 那么应该做的是:
- 我们应该通过创建具有足够分区的 dask 数据帧来在集群上对我们的数据帧进行分区。让我们称之为“A_Partitioned”
- 我们应该在集群上使用分散函数在集群上广播相同的数据集http://distributed.dask.org/en/stable/locality.html我们称之为A_Broadcasted
- 现在我们可以在 A_Partitioned 上执行 map_partiions,它将在分区和 A_Broadcasted 上执行嵌套循环