python 对象内的并行化
Parallelization within a python object
我正在做一个模拟,我需要在许多不同的时间点计算一个昂贵的数值积分。每个被积函数都是采样时间的函数,因此我必须独立评估每个点。因为每个积分都独立于所有其他积分,所以这可以以令人尴尬的并行方式实现。
我想 运行 在 HPC 集群上执行此操作,因此我尝试使用 mpi4py 并行化此过程;但是,我当前的实现导致每个处理器执行整个计算(包括分散到其他内核),而不是仅并行化对象内部的 for 循环。正如所写的那样,使用 n 个内核所需的时间是使用一个内核所需的时间的 n 倍(这不是一个好兆头......)。
因为唯一需要花费时间的步骤是计算本身,所以我想要除特定 for 循环之外的所有内容到根节点上的 运行。
下面是我当前实现的伪代码缩减:
import numpy as np
from mpi4py import MPI
COMM = MPI.COMM_WORLD
class Integrand:
def __init__(self, t_max, dt, **kwargs):
self.t_max = t_max
self.dt = dt
self.time_sample = np.arange(0, self.t_max, self.dt)
self.function_args = kwargs
self.final_result = np.empty_like(self.time_sample)
def do_integration(self):
if COMM.rank == 0:
times_partitioned = split(self.time_sample, COMM.size)
else:
times_partitioned = None
times_partitioned = COMM.scatter(times_partitioned, root=0)
results = np.empty(times_partitioned.shape, dtype=complex)
for counter, t in enumerate(times_partitioned):
results = computation(self, t, **self.function_args)
results = MPI.COMM_WORLD.gather(results, root=0)
if COMM.rank is 0:
##inter-leaf back together
for i in range(COMM.size):
self.final_result[i::COMM.size] = results[i]
if __name__ = '__main__':
kwargs_set = [kwargs1, kwargs2, kwargs3, ..., kwargsN]
for kwargs in kwargs_set:
integrand_object = Integrand(**kwargs)
integrand_object.do_integration()
save_and_plot_results(integrand_object.final_result)
在不彻底改变 class 是 called/used 的方式的情况下并行化此问题的一种简单方法是使用装饰器。装饰器(如下所示)使得它不是在每个核心上创建相同的对象,而是每个核心创建一个对象,其中包含它需要评估的时间步长块。在对它们全部进行评估后,它会收集它们的结果,并 returns 一个具有完整结果的单个对象到一个核心。这个特定的实现通过在创建时强制计算积分来稍微改变 class 功能。
from functools import wraps
import numpy as np
from mpi4py import MPI
COMM = MPI.COMM_WORLD
def parallelize_integrand(integral_class):
def split(container, count):
return [container[_i::count] for _i in range(count)]
@wraps(integral_class)
def wrapper(*args,**kwargs):
int_object = integral_class(*args, **kwargs)
time_sample_total = int_object.time_sample
if COMM.rank is 0:
split_time = split(time_sample_total,COMM.size)
final_result = np.empty_like(int_object.result)
else:
split_time = None
split_time = COMM.scatter(split_time, root=0)
int_object.time_sample = split_time
int_object.do_integration()
result = int_object.result
result = COMM.gather(result, root=0)
if COMM.rank is 0:
for i in range(COMM.size):
final_result[i::COMM.size] = result[i]
int_object.time_sample = time_sample_total
int_object.result = final_result
return int_object
@parallelize_integrand
class Integrand:
def __init__(self, t_max, dt, **kwargs):
self.t_max = t_max
self.dt = dt
self.time_sample = np.arange(0, self.t_max, self.dt)
self.kwargs = kwargs
self.result = np.empty_like(self.time_sample)
def do_integration(self):
for counter, t in enumerate(self.time_sample):
result[counter] = computation(self, t, **self.kwargs)
if __name__ = '__main__':
kwargs_set = [kwargs1, kwargs2, kwargs3, ..., kwargsN]
for kwargs in kwargs_set:
integrand_object = Integrand(**kwargs)
save_and_plot_results(integrand_object.result)
我正在做一个模拟,我需要在许多不同的时间点计算一个昂贵的数值积分。每个被积函数都是采样时间的函数,因此我必须独立评估每个点。因为每个积分都独立于所有其他积分,所以这可以以令人尴尬的并行方式实现。
我想 运行 在 HPC 集群上执行此操作,因此我尝试使用 mpi4py 并行化此过程;但是,我当前的实现导致每个处理器执行整个计算(包括分散到其他内核),而不是仅并行化对象内部的 for 循环。正如所写的那样,使用 n 个内核所需的时间是使用一个内核所需的时间的 n 倍(这不是一个好兆头......)。
因为唯一需要花费时间的步骤是计算本身,所以我想要除特定 for 循环之外的所有内容到根节点上的 运行。
下面是我当前实现的伪代码缩减:
import numpy as np
from mpi4py import MPI
COMM = MPI.COMM_WORLD
class Integrand:
def __init__(self, t_max, dt, **kwargs):
self.t_max = t_max
self.dt = dt
self.time_sample = np.arange(0, self.t_max, self.dt)
self.function_args = kwargs
self.final_result = np.empty_like(self.time_sample)
def do_integration(self):
if COMM.rank == 0:
times_partitioned = split(self.time_sample, COMM.size)
else:
times_partitioned = None
times_partitioned = COMM.scatter(times_partitioned, root=0)
results = np.empty(times_partitioned.shape, dtype=complex)
for counter, t in enumerate(times_partitioned):
results = computation(self, t, **self.function_args)
results = MPI.COMM_WORLD.gather(results, root=0)
if COMM.rank is 0:
##inter-leaf back together
for i in range(COMM.size):
self.final_result[i::COMM.size] = results[i]
if __name__ = '__main__':
kwargs_set = [kwargs1, kwargs2, kwargs3, ..., kwargsN]
for kwargs in kwargs_set:
integrand_object = Integrand(**kwargs)
integrand_object.do_integration()
save_and_plot_results(integrand_object.final_result)
在不彻底改变 class 是 called/used 的方式的情况下并行化此问题的一种简单方法是使用装饰器。装饰器(如下所示)使得它不是在每个核心上创建相同的对象,而是每个核心创建一个对象,其中包含它需要评估的时间步长块。在对它们全部进行评估后,它会收集它们的结果,并 returns 一个具有完整结果的单个对象到一个核心。这个特定的实现通过在创建时强制计算积分来稍微改变 class 功能。
from functools import wraps
import numpy as np
from mpi4py import MPI
COMM = MPI.COMM_WORLD
def parallelize_integrand(integral_class):
def split(container, count):
return [container[_i::count] for _i in range(count)]
@wraps(integral_class)
def wrapper(*args,**kwargs):
int_object = integral_class(*args, **kwargs)
time_sample_total = int_object.time_sample
if COMM.rank is 0:
split_time = split(time_sample_total,COMM.size)
final_result = np.empty_like(int_object.result)
else:
split_time = None
split_time = COMM.scatter(split_time, root=0)
int_object.time_sample = split_time
int_object.do_integration()
result = int_object.result
result = COMM.gather(result, root=0)
if COMM.rank is 0:
for i in range(COMM.size):
final_result[i::COMM.size] = result[i]
int_object.time_sample = time_sample_total
int_object.result = final_result
return int_object
@parallelize_integrand
class Integrand:
def __init__(self, t_max, dt, **kwargs):
self.t_max = t_max
self.dt = dt
self.time_sample = np.arange(0, self.t_max, self.dt)
self.kwargs = kwargs
self.result = np.empty_like(self.time_sample)
def do_integration(self):
for counter, t in enumerate(self.time_sample):
result[counter] = computation(self, t, **self.kwargs)
if __name__ = '__main__':
kwargs_set = [kwargs1, kwargs2, kwargs3, ..., kwargsN]
for kwargs in kwargs_set:
integrand_object = Integrand(**kwargs)
save_and_plot_results(integrand_object.result)