并行 scipy.sparse 逐元素乘法
Parallellised scipy.sparse element-wise multiplication
我有一个大数组 arr1
,形状为 (k, n)
,其中 k
和 n
的阶数都是 1e7。每行仅包含几百个非零元素并且是稀疏的。
对于每一行 k
,我需要与形状 (1, n)
的 arr2
进行逐元素乘法。
目前我使用 scipy.sparse.csc_matrix
的 multiply
方法执行此乘法,并且乘法是作为我正在最小化的函数的一部分执行的,这意味着它被评估了数千次并导致很大计算负荷。更重要的是,我发现这个函数 运行s 在单核 .
上
相反,我试图通过将数组拆分为 k
中的子数组来并行计算来找到并行计算的方法。令我沮丧的是,我发现并行化版本 运行 甚至更慢。到目前为止,我已经尝试了 Dask, Ray, and multiprocessing 中的实现。以下是我一直在具有 ~500GB RAM 和 56 个 CPU 的机器上使用的实现。
我不明白为什么并行版本运行这么慢。这是我第一次并行化自己的代码,因此非常感谢任何帮助。
设置数据(为了再现性)
import scipy.sparse as scisp
import numpy as np
import dask.array as da
import dask
import multiprocessing as mp
import ray
import psutil
rng = np.random.default_rng()
rows = np.zeros((5600, 1_000_000))
rows[:, rng.integers(low=0, high=1_000_000, size=110)] = 1
scisp_arr1 = scisp.coo_matrix(rows)
scisp_arr1 = scisp.csc_matrix(scisp_arr1)
arr2 = rng.uniform(size=(1, 1_000_000))
arr2 = scisp.csc_matrix(arr2)
arr1 = None
for i in range(1000):
big_box = scisp.vstack((arr1, scisp_arr))
arr1 = scisp.csc_matrix(arr1)
无与伦比
%%time arr1.multiply(arr2).sum()
CPU times: user 4.92 s, sys: 2.72 s, total: 7.64 s
Wall time: 7.64 s
达斯克
%%time
def f(arr1, arr2):
return arr1.multiply(arr2)
delayed_multiply = dask.delayed(f)
steps = arr1.shape[0]//56
total = []
for i in range(0, arr1.shape[0], steps):
total.append(delayed_multiply(arr1[i:i+steps], arr2).sum())
total = dask.delayed(sum)(total)
total.compute()
CPU times: user 1min 13s, sys: 49 s, total: 2min 2s
Wall time: 55.5 s
雷
ray.init(num_cpus=psutil.cpu_count())
%%time
@ray.remote
def f(arr1, arr2):
return arr1.multiply(arr2).sum()
steps = arr1.shape[0]//56
total = []
for i in range(0, arr1.shape[0], steps):
total.append(f.remote(arr1[i:i+steps], arr2))
sum(ray.get(total))
CPU times: user 52.4 s, sys: 9.39 s, total: 1min 1s
Wall time: 59.4 s
多处理
%%time
steps = arr1.shape[0]//56
chunks = [(arr1[i:i+steps], arr2) for i in range(0, arr1.shape[0], steps)]
def f(arr1, arr2):
return arr1.multiply(arr2).sum()
def main(args):
steps = arr1.shape[0]//56
pool = mp.Pool(mp.cpu_count())
result = pool.starmap(f, args)
return result
sum(main(chunks))
CPU times: user 49.8 s, sys: 41.9 s, total: 1min 31s
Wall time: 1min 39s
编辑 2021 年 11 月 18 日
在 之后,我使用 Dask 进行了以下更新尝试:
def foo(arr):
arr[:, rng.integers(low=0, high=1_000_000, size=110)] = 1
return arr
chunk_len = 0.8*psutil.virtual_memory().available // 1e6 // psutil.cpu_count() // 8
arr1 = da.zeros((5_600_000, 1_000_000), chunks=(chunk_len, 1_000_000))
arr1 = foo(arr1)
arr1 = arr1.map_blocks(sparse.COO)
result = arr1.compute()
arr1 = da.from_array(result, chunks=(chunk_len, 1_000_000))
---
%%time
arr2 = da.random.uniform(size=(1, 1_000_000))
K = (arr1*arr2).sum(axis=1)
final_result = np.log(K.compute()).sum(axis=0)
CPU times: user 2min 5s, sys: 51 s, total: 2min 56s
Wall time: 5.71 s
在单核上使用 Scipy.sparse 的相同操作得到:
arr1_scipy = result.tocsc()
---
%%time
arr2 = scisp.csc_matrix(rng.uniform(size=(1, 1_000_000)))
K = arr1_scipy.multiply(arr2).sum(axis=1)
final_result = np.log(K).sum(axis=0)
CPU times: user 4.88 s, sys: 1.65 s, total: 6.53 s
Wall time: 6.53 s
我很惊讶没有更大的改进。这仅仅是由于并行化的开销吗?能否进一步改进 Dask 实施?
如果我对您的实现的理解正确,那么在任何这些情况下您实际上都没有对数组进行分区。所以你所做的只是 运行 完全相同的工作流程,但在不同的线程上,所以“并行”执行时间是原始 运行 时间加上设置分布式作业调度程序的开销和将所有内容传递给第二个线程。
如果您想看到任何总时间改进,您必须实际重写代码以对数据子集进行操作。
在最简单的情况下,使用 dask.array.from_numpy
to split the array into multiple chunks, then rewrite your workflow to use dask.array operations rather than numpy ones. Alternatively, partition the data yourself and run your function on subsets of the array using dask distributed's client.map
(see the quickstart).
None 这些方法很简单,您需要认识到存在开销(无论是实际 compute/network usagee/memory 等还是真正的投资你的时间)在任何这些中,但如果总 运行 时间很重要,那么它是值得的。有关更多背景信息,请参阅 dask best practices 文档。
更新:
在使用 dask.array 进行迭代后,您的实现现在比单线程墙时间更快,是的,额外的 CPU 时间是开销。对于你第一次尝试这个,让它比 numpy/scipy 更快(顺便说一句,它已经被大量优化并且很可能在单线程方法的引擎盖下并行化)是一个巨大的胜利,所以帕特自己在后面。让它变得更快是一个合理的挑战,远远超出了这个问题的范围。欢迎使用并行性!
补充阅读:
在问题中更新的代码片段中,有一个 .compute
步骤,它降低了使用 dask 的好处:
# deleting/commenting out these two lines will keep arr1 as a dask array
#result = arr1.compute()
#arr1 = da.from_array(result, chunks=(chunk_len, 1_000_000))
# the downstream code doesn't need to change except for putting .compute
# at the very end, rather than inside np.log (it's a very minor impact)
arr2 = da.random.uniform(size=(1, 1_000_000))
K = (arr1*arr2).sum(axis=1)
final_result = np.log(K).sum(axis=0).compute()
如果您在笔记本中,重新运行最后一个片段第二次将显示速度提升。
我有一个大数组 arr1
,形状为 (k, n)
,其中 k
和 n
的阶数都是 1e7。每行仅包含几百个非零元素并且是稀疏的。
对于每一行 k
,我需要与形状 (1, n)
的 arr2
进行逐元素乘法。
目前我使用 scipy.sparse.csc_matrix
的 multiply
方法执行此乘法,并且乘法是作为我正在最小化的函数的一部分执行的,这意味着它被评估了数千次并导致很大计算负荷。更重要的是,我发现这个函数 运行s 在单核 .
相反,我试图通过将数组拆分为 k
中的子数组来并行计算来找到并行计算的方法。令我沮丧的是,我发现并行化版本 运行 甚至更慢。到目前为止,我已经尝试了 Dask, Ray, and multiprocessing 中的实现。以下是我一直在具有 ~500GB RAM 和 56 个 CPU 的机器上使用的实现。
我不明白为什么并行版本运行这么慢。这是我第一次并行化自己的代码,因此非常感谢任何帮助。
设置数据(为了再现性)
import scipy.sparse as scisp
import numpy as np
import dask.array as da
import dask
import multiprocessing as mp
import ray
import psutil
rng = np.random.default_rng()
rows = np.zeros((5600, 1_000_000))
rows[:, rng.integers(low=0, high=1_000_000, size=110)] = 1
scisp_arr1 = scisp.coo_matrix(rows)
scisp_arr1 = scisp.csc_matrix(scisp_arr1)
arr2 = rng.uniform(size=(1, 1_000_000))
arr2 = scisp.csc_matrix(arr2)
arr1 = None
for i in range(1000):
big_box = scisp.vstack((arr1, scisp_arr))
arr1 = scisp.csc_matrix(arr1)
无与伦比
%%time arr1.multiply(arr2).sum()
CPU times: user 4.92 s, sys: 2.72 s, total: 7.64 s
Wall time: 7.64 s
达斯克
%%time
def f(arr1, arr2):
return arr1.multiply(arr2)
delayed_multiply = dask.delayed(f)
steps = arr1.shape[0]//56
total = []
for i in range(0, arr1.shape[0], steps):
total.append(delayed_multiply(arr1[i:i+steps], arr2).sum())
total = dask.delayed(sum)(total)
total.compute()
CPU times: user 1min 13s, sys: 49 s, total: 2min 2s
Wall time: 55.5 s
雷
ray.init(num_cpus=psutil.cpu_count())
%%time
@ray.remote
def f(arr1, arr2):
return arr1.multiply(arr2).sum()
steps = arr1.shape[0]//56
total = []
for i in range(0, arr1.shape[0], steps):
total.append(f.remote(arr1[i:i+steps], arr2))
sum(ray.get(total))
CPU times: user 52.4 s, sys: 9.39 s, total: 1min 1s
Wall time: 59.4 s
多处理
%%time
steps = arr1.shape[0]//56
chunks = [(arr1[i:i+steps], arr2) for i in range(0, arr1.shape[0], steps)]
def f(arr1, arr2):
return arr1.multiply(arr2).sum()
def main(args):
steps = arr1.shape[0]//56
pool = mp.Pool(mp.cpu_count())
result = pool.starmap(f, args)
return result
sum(main(chunks))
CPU times: user 49.8 s, sys: 41.9 s, total: 1min 31s
Wall time: 1min 39s
编辑 2021 年 11 月 18 日
在
def foo(arr):
arr[:, rng.integers(low=0, high=1_000_000, size=110)] = 1
return arr
chunk_len = 0.8*psutil.virtual_memory().available // 1e6 // psutil.cpu_count() // 8
arr1 = da.zeros((5_600_000, 1_000_000), chunks=(chunk_len, 1_000_000))
arr1 = foo(arr1)
arr1 = arr1.map_blocks(sparse.COO)
result = arr1.compute()
arr1 = da.from_array(result, chunks=(chunk_len, 1_000_000))
---
%%time
arr2 = da.random.uniform(size=(1, 1_000_000))
K = (arr1*arr2).sum(axis=1)
final_result = np.log(K.compute()).sum(axis=0)
CPU times: user 2min 5s, sys: 51 s, total: 2min 56s
Wall time: 5.71 s
在单核上使用 Scipy.sparse 的相同操作得到:
arr1_scipy = result.tocsc()
---
%%time
arr2 = scisp.csc_matrix(rng.uniform(size=(1, 1_000_000)))
K = arr1_scipy.multiply(arr2).sum(axis=1)
final_result = np.log(K).sum(axis=0)
CPU times: user 4.88 s, sys: 1.65 s, total: 6.53 s
Wall time: 6.53 s
我很惊讶没有更大的改进。这仅仅是由于并行化的开销吗?能否进一步改进 Dask 实施?
如果我对您的实现的理解正确,那么在任何这些情况下您实际上都没有对数组进行分区。所以你所做的只是 运行 完全相同的工作流程,但在不同的线程上,所以“并行”执行时间是原始 运行 时间加上设置分布式作业调度程序的开销和将所有内容传递给第二个线程。
如果您想看到任何总时间改进,您必须实际重写代码以对数据子集进行操作。
在最简单的情况下,使用 dask.array.from_numpy
to split the array into multiple chunks, then rewrite your workflow to use dask.array operations rather than numpy ones. Alternatively, partition the data yourself and run your function on subsets of the array using dask distributed's client.map
(see the quickstart).
None 这些方法很简单,您需要认识到存在开销(无论是实际 compute/network usagee/memory 等还是真正的投资你的时间)在任何这些中,但如果总 运行 时间很重要,那么它是值得的。有关更多背景信息,请参阅 dask best practices 文档。
更新:
在使用 dask.array 进行迭代后,您的实现现在比单线程墙时间更快,是的,额外的 CPU 时间是开销。对于你第一次尝试这个,让它比 numpy/scipy 更快(顺便说一句,它已经被大量优化并且很可能在单线程方法的引擎盖下并行化)是一个巨大的胜利,所以帕特自己在后面。让它变得更快是一个合理的挑战,远远超出了这个问题的范围。欢迎使用并行性!
补充阅读:
在问题中更新的代码片段中,有一个 .compute
步骤,它降低了使用 dask 的好处:
# deleting/commenting out these two lines will keep arr1 as a dask array
#result = arr1.compute()
#arr1 = da.from_array(result, chunks=(chunk_len, 1_000_000))
# the downstream code doesn't need to change except for putting .compute
# at the very end, rather than inside np.log (it's a very minor impact)
arr2 = da.random.uniform(size=(1, 1_000_000))
K = (arr1*arr2).sum(axis=1)
final_result = np.log(K).sum(axis=0).compute()
如果您在笔记本中,重新运行最后一个片段第二次将显示速度提升。