python 对象内的并行化

Parallelization within a python object

我正在做一个模拟,我需要在许多不同的时间点计算一个昂贵的数值积分。每个被积函数都是采样时间的函数,因此我必须独立评估每个点。因为每个积分都独立于所有其他积分,所以这可以以令人尴尬的并行方式实现。

我想 运行 在 HPC 集群上执行此操作,因此我尝试使用 mpi4py 并行化此过程;但是,我当前的实现导致每个处理器执行整个计算(包括分散到其他内核),而不是仅并行化对象内部的 for 循环。正如所写的那样,使用 n 个内核所需的时间是使用一个内核所需的时间的 n 倍(这不是一个好兆头......)。

因为唯一需要花费时间的步骤是计算本身,所以我想要除特定 for 循环之外的所有内容到根节点上的 运行。

下面是我当前实现的伪代码缩减:

import numpy as np
from mpi4py import MPI

COMM = MPI.COMM_WORLD

class Integrand:

    def __init__(self, t_max, dt, **kwargs):
        self.t_max = t_max
        self.dt = dt
        self.time_sample = np.arange(0, self.t_max, self.dt)
        self.function_args = kwargs
        self.final_result = np.empty_like(self.time_sample)

    def do_integration(self):
        if COMM.rank == 0:
            times_partitioned = split(self.time_sample, COMM.size)
        else:
            times_partitioned = None

        times_partitioned = COMM.scatter(times_partitioned, root=0)

        results = np.empty(times_partitioned.shape, dtype=complex)

        for counter, t in enumerate(times_partitioned):
            results = computation(self, t, **self.function_args)

        results = MPI.COMM_WORLD.gather(results, root=0)

        if COMM.rank is 0:
            ##inter-leaf back together
            for i in range(COMM.size):
                self.final_result[i::COMM.size] = results[i]

if __name__  = '__main__':

    kwargs_set = [kwargs1, kwargs2, kwargs3, ..., kwargsN]

    for kwargs in kwargs_set:
        integrand_object = Integrand(**kwargs)
        integrand_object.do_integration()
        save_and_plot_results(integrand_object.final_result)

在不彻底改变 class 是 called/used 的方式的情况下并行化此问题的一种简单方法是使用装饰器。装饰器(如下所示)使得它不是在每个核心上创建相同的对象,而是每个核心创建一个对象,其中包含它需要评估的时间步长块。在对它们全部进行评估后,它会收集它们的结果,并 returns 一个具有完整结果的单个对象到一个核心。这个特定的实现通过在创建时强制计算积分来稍微改变 class 功能。

from functools import wraps
import numpy as np
from mpi4py import MPI

COMM = MPI.COMM_WORLD

def parallelize_integrand(integral_class):
    def split(container, count):
        return [container[_i::count] for _i in range(count)]

    @wraps(integral_class)
    def wrapper(*args,**kwargs):
         int_object = integral_class(*args, **kwargs)
         time_sample_total = int_object.time_sample
         if COMM.rank is 0:
             split_time = split(time_sample_total,COMM.size)
             final_result = np.empty_like(int_object.result)
         else:
             split_time = None
         split_time = COMM.scatter(split_time, root=0)
         int_object.time_sample = split_time
         int_object.do_integration()
         result = int_object.result
         result = COMM.gather(result, root=0)

         if COMM.rank is 0:
             for i in range(COMM.size):
                 final_result[i::COMM.size] = result[i]
             int_object.time_sample = time_sample_total
             int_object.result = final_result
             return int_object


@parallelize_integrand
class Integrand:

    def __init__(self, t_max, dt, **kwargs):
        self.t_max = t_max
        self.dt = dt
        self.time_sample = np.arange(0, self.t_max, self.dt)
        self.kwargs = kwargs
        self.result = np.empty_like(self.time_sample)

    def do_integration(self):
        for counter, t in enumerate(self.time_sample):
            result[counter] = computation(self, t, **self.kwargs)


if __name__  = '__main__':
    kwargs_set = [kwargs1, kwargs2, kwargs3, ..., kwargsN]
    for kwargs in kwargs_set:
        integrand_object = Integrand(**kwargs)
        save_and_plot_results(integrand_object.result)