使用 joblib 和 SLURM 在 Python 中并行化 for 循环
Parallelizing for-loops in Python using joblib & SLURM
我有一个包含 100 个元组的列表 tuplelist
,它们用作外部函数的输入。外部函数 returns 一个值,并将该值附加到一个数组中,像这样 (MainFile.py
):
from ExternalPythonFile import ExternalFunction
valuelist = []
for a,b in tuplelist:
value = ExternalFunction(a,b)
# more functions here
valuelist.append(value)
print(len(valuelist))
使用上述 for 循环时 print(len(valuelist))
的输出是 (100,)
。
现在,由于元组的顺序以及它们的附加方式对我来说并不重要,我想并行化 for 循环,因为处理 100 个元组需要大约 10 分钟,我希望缩放那个数字。我在下面尝试了一个 joblib 实现 (MainFileJoblib.py
):
from ExternalPythonFile import ExternalFunction
from joblib import Parallel, delayed, parallel_backend
import multiprocessing
valuelist = []
def TupleFunction(a,b):
value = ExternalFunction(a,b)
# more functions here
valuelist.append(value)
with parallel_backend('multiprocessing'):
Parallel(n_jobs=10)(delayed(TupleFunction)(a,b) for a,b in tuplelist)
print(len(valuelist))
我运行在 unix 计算集群上执行所有这些操作,但 运行时间仍然相似,大约 8 分钟。输出也是错误的,它打印了 (0,)
.
查看 htop
我发现实际上有 10 个核心正在使用,但每个核心仅使用 20%。
我也尝试过 运行 通过 SLURM 实现 joblib:
srun --ntasks=1 --ncpus-per-task=10 python3 MainFileJoblib.py
这在大约 2 分钟时肯定更快,但它再次给出了错误的结果 (0,)
。
并行化原始 for 循环的最佳方法是什么?
Joblib 自行管理输出列表的创建和填充,因此可以轻松修复代码:
from ExternalPythonFile import ExternalFunction
from joblib import Parallel, delayed, parallel_backend
import multiprocessing
with parallel_backend('multiprocessing'):
valuelist = Parallel(n_jobs=10)(delayed(ExternalFunction)(a, b) for a, b in tuplelist)
print(len(valuelist))
如果出于某种原因您需要更新类似数组的对象,您可以使用 numpy memmap,按照以下最小示例:
import tempfile
import numpy as np
from ExternalPythonFile import ExternalFunction
from joblib import Parallel, delayed, parallel_backend
import multiprocessing
# define function to update your array
def fill_array(mm_file, i, tuple_val):
a, b = tuple_val
value = ExternalFunction(a, b)
# more functions here
mm_file[i] = value
# create a temporary folder
tmp_dir = tempfile.mkdtemp()
# create a file where to dump your array
values_fname_memmap = Path(tmp_dir).joinpath("values_memmap")
values_memmap = np.memmap(values_fname_memmap.as_posix(),
dtype=np.float,
shape=(len(tuplelist), ),
mode='w+')
with parallel_backend('multiprocessing'):
Parallel(n_jobs=10)(delayed(fill_array)(values_memmap, i, ab)
for i, ab in enumerate(tuplelist))
print(len(values_memmap))
如果您需要对值应用一组转换(# more functions),只需围绕 ExternalFunction 进行包装,为给定的元组输出所需的值(a, b).
尽管回复晚了,但我希望它对您仍然有用。
我有一个包含 100 个元组的列表 tuplelist
,它们用作外部函数的输入。外部函数 returns 一个值,并将该值附加到一个数组中,像这样 (MainFile.py
):
from ExternalPythonFile import ExternalFunction
valuelist = []
for a,b in tuplelist:
value = ExternalFunction(a,b)
# more functions here
valuelist.append(value)
print(len(valuelist))
使用上述 for 循环时 print(len(valuelist))
的输出是 (100,)
。
现在,由于元组的顺序以及它们的附加方式对我来说并不重要,我想并行化 for 循环,因为处理 100 个元组需要大约 10 分钟,我希望缩放那个数字。我在下面尝试了一个 joblib 实现 (MainFileJoblib.py
):
from ExternalPythonFile import ExternalFunction
from joblib import Parallel, delayed, parallel_backend
import multiprocessing
valuelist = []
def TupleFunction(a,b):
value = ExternalFunction(a,b)
# more functions here
valuelist.append(value)
with parallel_backend('multiprocessing'):
Parallel(n_jobs=10)(delayed(TupleFunction)(a,b) for a,b in tuplelist)
print(len(valuelist))
我运行在 unix 计算集群上执行所有这些操作,但 运行时间仍然相似,大约 8 分钟。输出也是错误的,它打印了 (0,)
.
查看 htop
我发现实际上有 10 个核心正在使用,但每个核心仅使用 20%。
我也尝试过 运行 通过 SLURM 实现 joblib:
srun --ntasks=1 --ncpus-per-task=10 python3 MainFileJoblib.py
这在大约 2 分钟时肯定更快,但它再次给出了错误的结果 (0,)
。
并行化原始 for 循环的最佳方法是什么?
Joblib 自行管理输出列表的创建和填充,因此可以轻松修复代码:
from ExternalPythonFile import ExternalFunction
from joblib import Parallel, delayed, parallel_backend
import multiprocessing
with parallel_backend('multiprocessing'):
valuelist = Parallel(n_jobs=10)(delayed(ExternalFunction)(a, b) for a, b in tuplelist)
print(len(valuelist))
如果出于某种原因您需要更新类似数组的对象,您可以使用 numpy memmap,按照以下最小示例:
import tempfile
import numpy as np
from ExternalPythonFile import ExternalFunction
from joblib import Parallel, delayed, parallel_backend
import multiprocessing
# define function to update your array
def fill_array(mm_file, i, tuple_val):
a, b = tuple_val
value = ExternalFunction(a, b)
# more functions here
mm_file[i] = value
# create a temporary folder
tmp_dir = tempfile.mkdtemp()
# create a file where to dump your array
values_fname_memmap = Path(tmp_dir).joinpath("values_memmap")
values_memmap = np.memmap(values_fname_memmap.as_posix(),
dtype=np.float,
shape=(len(tuplelist), ),
mode='w+')
with parallel_backend('multiprocessing'):
Parallel(n_jobs=10)(delayed(fill_array)(values_memmap, i, ab)
for i, ab in enumerate(tuplelist))
print(len(values_memmap))
如果您需要对值应用一组转换(# more functions),只需围绕 ExternalFunction 进行包装,为给定的元组输出所需的值(a, b).
尽管回复晚了,但我希望它对您仍然有用。