如何在多个节点上分配多进程 CPU 使用?
How to distribute multiprocess CPU usage over multiple nodes?
我正在尝试 运行 使用多处理的 HPC 上的作业。每个进程的峰值内存使用量约为 44GB。 class I can use 作业允许使用1-16个节点,每个节点32个CPU,内存124GB。因此,如果我想尽快 运行 代码(并且在最大 walltime 限制内),我应该能够 运行 每个节点上的 2 个 CPU,在所有 16 个节点中最多 32 个。但是,当我指定 mp.Pool(32)
时,作业很快就超过了内存限制,我假设是因为在一个节点上使用了两个以上的 CPU。
我的本能是在我 运行 我的 python 脚本的 pbs 脚本中指定 2 个 CPU 作为最大值,但是系统不允许这种配置。非常感谢任何见解,今天大部分时间我都在为这个问题摸不着头脑 - 并且过去曾面临并解决过类似的问题,但没有解决这里的基本问题。
以下两个脚本的简化版本:
#!/bin/sh
#PBS -l select=16:ncpus=32:mem=124gb
#PBS -l walltime=24:00:00
module load anaconda3/personal
source activate py_env
python directory/script.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
import multiprocessing as mp
def df_function(df, arr1, arr2):
df['col3'] = some_algorithm(df, arr1, arr2)
return df
def parallelize_dataframe(df, func, num_cores):
df_split = np.array_split(df, num_cores)
with mp.Pool(num_cores, maxtasksperchild = 10 ** 3) as pool:
df = pd.concat(pool.map(func, df_split))
return df
def main():
# Loading input data
direc = '/home/dir1/dir2/'
file = 'input_data.csv'
a_file = 'array_a.npy'
b_file = 'array_b.npy'
df = pd.read_csv(direc + file)
a = np.load(direc + a_file)
b = np.load(direc + b_file)
# Globally defining function with keyword defaults
global f
def f(df):
return df_function(df, arr1 = a, arr2 = b)
num_cores = 32 # i.e. 2 per node if evenly distributed.
# Running the function as a multiprocess:
df = parallelize_dataframe(df, f, num_cores)
# Saving:
df.to_csv(direc + 'outfile.csv', index = False)
if __name__ == '__main__':
main()
为了 运行 你的工作 as-is,你可以简单地请求 ncpu=32
然后在你的 python 脚本中设置 num_cores = 2
。显然,这让你支付了 32 个核心的费用,然后让其中的 30 个闲置,这很浪费。
这里真正的问题是您当前的算法是 memory-bound,而不是 CPU-bound。您应该竭尽全力只将文件的块读入内存,对块进行操作,然后将结果块写入磁盘以便稍后组织。
还好Dask
is built to do exactly this kind of thing. As a first step, you can take out the parallelize_dataframe
function and directly load and map your some_algorithm
with a dask.dataframe
and dask.array
:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import dask.dataframe as dd
import dask.array as da
def main():
# Loading input data
direc = '/home/dir1/dir2/'
file = 'input_data.csv'
a_file = 'array_a.npy'
b_file = 'array_b.npy'
df = dd.read_csv(direc + file, blocksize=25e6)
a_and_b = da.from_np_stack(direc)
df['col3'] = df.apply(some_algorithm, args=(a_and_b,))
# dask is lazy, this is the only line that does any work
# Saving:
df.to_csv(
direc + 'outfile.csv',
index = False,
compute_kwargs={"scheduler": "threads"}, # also "processes", but try threads first
)
if __name__ == '__main__':
main()
这将需要对 some_algorithm
和 to_csv
and from_np_stack
work a bit differently, but you will be able to reasonably run this thing just on your own laptop and it will scale to your cluster hardware. You can level up from here by using the distributed scheduler or even deploy it directly to your cluster with dask-jobqueue 进行一些调整。
我正在尝试 运行 使用多处理的 HPC 上的作业。每个进程的峰值内存使用量约为 44GB。 class I can use 作业允许使用1-16个节点,每个节点32个CPU,内存124GB。因此,如果我想尽快 运行 代码(并且在最大 walltime 限制内),我应该能够 运行 每个节点上的 2 个 CPU,在所有 16 个节点中最多 32 个。但是,当我指定 mp.Pool(32)
时,作业很快就超过了内存限制,我假设是因为在一个节点上使用了两个以上的 CPU。
我的本能是在我 运行 我的 python 脚本的 pbs 脚本中指定 2 个 CPU 作为最大值,但是系统不允许这种配置。非常感谢任何见解,今天大部分时间我都在为这个问题摸不着头脑 - 并且过去曾面临并解决过类似的问题,但没有解决这里的基本问题。
以下两个脚本的简化版本:
#!/bin/sh
#PBS -l select=16:ncpus=32:mem=124gb
#PBS -l walltime=24:00:00
module load anaconda3/personal
source activate py_env
python directory/script.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
import multiprocessing as mp
def df_function(df, arr1, arr2):
df['col3'] = some_algorithm(df, arr1, arr2)
return df
def parallelize_dataframe(df, func, num_cores):
df_split = np.array_split(df, num_cores)
with mp.Pool(num_cores, maxtasksperchild = 10 ** 3) as pool:
df = pd.concat(pool.map(func, df_split))
return df
def main():
# Loading input data
direc = '/home/dir1/dir2/'
file = 'input_data.csv'
a_file = 'array_a.npy'
b_file = 'array_b.npy'
df = pd.read_csv(direc + file)
a = np.load(direc + a_file)
b = np.load(direc + b_file)
# Globally defining function with keyword defaults
global f
def f(df):
return df_function(df, arr1 = a, arr2 = b)
num_cores = 32 # i.e. 2 per node if evenly distributed.
# Running the function as a multiprocess:
df = parallelize_dataframe(df, f, num_cores)
# Saving:
df.to_csv(direc + 'outfile.csv', index = False)
if __name__ == '__main__':
main()
为了 运行 你的工作 as-is,你可以简单地请求 ncpu=32
然后在你的 python 脚本中设置 num_cores = 2
。显然,这让你支付了 32 个核心的费用,然后让其中的 30 个闲置,这很浪费。
这里真正的问题是您当前的算法是 memory-bound,而不是 CPU-bound。您应该竭尽全力只将文件的块读入内存,对块进行操作,然后将结果块写入磁盘以便稍后组织。
还好Dask
is built to do exactly this kind of thing. As a first step, you can take out the parallelize_dataframe
function and directly load and map your some_algorithm
with a dask.dataframe
and dask.array
:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import dask.dataframe as dd
import dask.array as da
def main():
# Loading input data
direc = '/home/dir1/dir2/'
file = 'input_data.csv'
a_file = 'array_a.npy'
b_file = 'array_b.npy'
df = dd.read_csv(direc + file, blocksize=25e6)
a_and_b = da.from_np_stack(direc)
df['col3'] = df.apply(some_algorithm, args=(a_and_b,))
# dask is lazy, this is the only line that does any work
# Saving:
df.to_csv(
direc + 'outfile.csv',
index = False,
compute_kwargs={"scheduler": "threads"}, # also "processes", but try threads first
)
if __name__ == '__main__':
main()
这将需要对 some_algorithm
和 to_csv
and from_np_stack
work a bit differently, but you will be able to reasonably run this thing just on your own laptop and it will scale to your cluster hardware. You can level up from here by using the distributed scheduler or even deploy it directly to your cluster with dask-jobqueue 进行一些调整。