使用串行输出优化分布式 I/O

Optimizing Distributed I/O with serial output

我无法理解如何优化具有串行输出的分布式组件。这是我对 openmdao 文档中给出的示例问题的尝试。

import numpy as np

import openmdao.api as om
from openmdao.utils.array_utils import evenly_distrib_idxs
from openmdao.utils.mpi import MPI


class MixedDistrib2(om.ExplicitComponent):

    def setup(self):
        # Distributed Input
        self.add_input('in_dist', shape_by_conn=True, distributed=True)
        # Serial Input
        self.add_input('in_serial', val=1)
        # Distributed Output
        self.add_output('out_dist', copy_shape='in_dist', distributed=True)
        # Serial Output
        self.add_output('out_serial', copy_shape='in_serial')
        #self.declare_partials('*','*', method='cs')

    def compute(self, inputs, outputs):
        x = inputs['in_dist']
        y = inputs['in_serial']
        # "Computationally Intensive" operation that we wish to parallelize.
        f_x = x**2 - 2.0*x + 4.0
        # These operations are repeated on all procs.
        f_y = y ** 0.5
        g_y = y**2 + 3.0*y - 5.0
        # Compute square root of our portion of the distributed input.
        g_x = x ** 0.5
        # Distributed output
        outputs['out_dist'] = f_x + f_y
        # Serial output
        if MPI and comm.size > 1:
            # We need to gather the summed values to compute the total sum over all procs.
            local_sum = np.array(np.sum(g_x))
            total_sum = local_sum.copy()
            self.comm.Allreduce(local_sum, total_sum, op=MPI.SUM)
            outputs['out_serial'] = g_y * total_sum
        else:
            # Recommended to make sure your code can run in serial too, for testing.
            outputs['out_serial'] = g_y * np.sum(g_x)

size = 7
if MPI:
    comm = MPI.COMM_WORLD
    rank = comm.rank
    sizes, offsets = evenly_distrib_idxs(comm.size, size)
else:
    # When running in serial, the entire variable is on rank 0.
    rank = 0
    sizes = {rank : size}
    offsets = {rank : 0}

prob = om.Problem()
model = prob.model

# Create a distributed source for the distributed input.
ivc = om.IndepVarComp()
ivc.add_output('x_dist', np.zeros(sizes[rank]), distributed=True)
ivc.add_output('x_serial', val=1)

model.add_subsystem("indep", ivc)
model.add_subsystem("D1", MixedDistrib2())
model.add_subsystem('con_cmp1', om.ExecComp('con1 = y**2'), promotes=['con1', 'y'])

model.connect('indep.x_dist', 'D1.in_dist')
model.connect('indep.x_serial', ['D1.in_serial','y'])

prob.driver = om.ScipyOptimizeDriver()
prob.driver.options['optimizer'] = 'SLSQP'

model.add_design_var('indep.x_serial', lower=5, upper=10)
model.add_constraint('con1', upper=90)

model.add_objective('D1.out_serial')

prob.setup(force_alloc_complex=True)
#prob.setup()

# Set initial values of distributed variable.
x_dist_init = [1,1,1,1,1,1,1]
prob.set_val('indep.x_dist', x_dist_init)

# Set initial values of serial variable.
prob.set_val('indep.x_serial', 10)

#prob.run_model()

prob.run_driver()
print('x_dist', prob.get_val('indep.x_dist', get_remote=True))
print('x_serial', prob.get_val('indep.x_serial'))
print('Obj', prob.get_val('D1.out_serial'))

问题在于用 'fd' 或 'cs' 定义分音。我无法定义部分串行输出 w.r.t 分布式输入。所以我用 prob.setup(force_alloc_complex=True) 来使用复杂的步骤。但是给我这个警告 DerivativesWarning:Constraints or objectives [('D1.out_serial', inds=[0])] cannot be impacted by the design variables of the problem. 我明白这是因为总导数是 0 导致警告但我不明白原因。显然这里的全导数不应该为 0。但我猜这是因为我没有在组件中明确地 declare_partials 。我尝试删除分布式组件并使用 declare_partials 再次 运行 它并且这工作正常(下面的代码)。

import numpy as np

import openmdao.api as om


class MixedDistrib2(om.ExplicitComponent):

    def setup(self):

        self.add_input('in_dist', np.zeros(7))
        self.add_input('in_serial', val=1)

        self.add_output('out_serial', val=0)
        self.declare_partials('*','*', method='cs')

    def compute(self, inputs, outputs):
        x = inputs['in_dist']
        y = inputs['in_serial']

        g_y = y**2 + 3.0*y - 5.0
        g_x = x ** 0.5

        outputs['out_serial'] = g_y * np.sum(g_x)    

prob = om.Problem()
model = prob.model

model.add_subsystem("D1", MixedDistrib2(), promotes_inputs=['in_dist', 'in_serial'], promotes_outputs=['out_serial'])
model.add_subsystem('con_cmp1', om.ExecComp('con1 = in_serial**2'), promotes=['con1', 'in_serial'])

prob.driver = om.ScipyOptimizeDriver()
prob.driver.options['optimizer'] = 'SLSQP'

model.add_design_var('in_serial', lower=5, upper=10)
model.add_constraint('con1', upper=90)

model.add_objective('out_serial')

prob.setup(force_alloc_complex=True)

prob.set_val('in_dist', [1,1,1,1,1,1,1])
prob.set_val('in_serial', 10)

prob.run_model()
prob.check_totals()

prob.run_driver()

print('x_dist', prob.get_val('in_dist', get_remote=True))
print('x_serial', prob.get_val('in_serial'))
print('Obj', prob.get_val('out_serial'))

我想了解的是

  1. 如何在具有串行输出的分布式组件中使用 'fd' 或 'cs'?
  2. prob.setup(force_alloc_complex=True) 是什么意思?是不是强制在所有组件中使用 cs 的问题?如果是这样为什么总导数变成0?

当我在 OpenMDAO V 3.11.0 中 运行 您的代码时(在取消注释 declare_partials 调用之后)我收到以下错误:

RuntimeError: 'D1' <class MixedDistrib2>: component has defined partial ('out_serial', 'in_dist') which is a serial output wrt a distributed input. This is only supported using the matrix free API.

如错误所示,在这种情况下,您不能将基于矩阵的 api 用于导数。原因有点微妙,可能超出了在这里回答您的问题需要处理的范围。归结为 OpenMDAO 不知道为什么在计算中进行分布式操作,并且在反向传播事物时无法管理这些细节。

所以在这种情况下你需要使用matrix-free derivative APIs。当您使用无矩阵 API 时,您 不要 声明任何部分,因为您不希望 OpenMDAO 为您分配任何内存来存储部分(并且您不会使用它内存,即使有)。

我在这里为您的示例编写了代码,但我需要注意一些重要的细节:

  1. 您的示例具有分布式 IVC,但从 OpenMDAO V3.11.0 开始,您无法获得关于分布式设计变量的全导数。我假设您只是以这种方式制作简单的测试用例,但如果您的真正问题是以这种方式设置的,您需要注意这一点而不是这样做。相反,制作 IVC 序列,并使用 src 索引将正确的部分分配给每个 proc。
  2. 在下面的例子中,导数是正确的。但是,运行 并行时 check_partials 输出中似乎存在错误。所以反向模式部分看起来像它们因通信大小而关闭......这必须在以后的版本中得到修复。
  3. 我只做了 out_serial 的导数。 out_dist 将以类似的方式工作,并留作 reader :)
  4. 的练习尺寸
  5. 您会注意到我在 computecompute_jacvec_product 方法中复制了一些代码。您可以将此重复代码抽象为它自己的方法(或通过提供您自己的输出字典从 compute_jacvec_product 中调用 compute)。但是,您可能会问为什么需要重复调​​用?为什么你不能存储来自计算调用的值。答案在很大程度上是,OpenMDAO 不保证 compute 总是在 compute_jacvec_product 之前调用。但是,我还要指出,这种代码重复非常像 AD。任何 AD 代码都将内置相同类型的重复项,即使您看不到它。
import numpy as np

import openmdao.api as om
from openmdao.utils.array_utils import evenly_distrib_idxs
from openmdao.utils.mpi import MPI


class MixedDistrib2(om.ExplicitComponent):

    def setup(self):
        # Distributed Input
        self.add_input('in_dist', shape_by_conn=True, distributed=True)
        # Serial Input
        self.add_input('in_serial', val=1)
        # Distributed Output
        self.add_output('out_dist', copy_shape='in_dist', distributed=True)
        # Serial Output
        self.add_output('out_serial', copy_shape='in_serial')

        # self.declare_partials('*','*', method='fd')

    def compute(self, inputs, outputs):
        x = inputs['in_dist']
        y = inputs['in_serial']
        # "Computationally Intensive" operation that we wish to parallelize.
        f_x = x**2 - 2.0*x + 4.0 
        # These operations are repeated on all procs.
        f_y = y ** 0.5
        g_y = y**2 + 3.0*y - 5.0
        # Compute square root of our portion of the distributed input.
        g_x = x ** 0.5
        # Distributed output
        outputs['out_dist'] = f_x + f_y
        # Serial output
        if MPI and comm.size > 1:
            # We need to gather the summed values to compute the total sum over all procs.
            local_sum = np.array(np.sum(g_x))
            total_sum = local_sum.copy()
            self.comm.Allreduce(local_sum, total_sum, op=MPI.SUM)
            outputs['out_serial'] = g_y * total_sum
        else:
            # Recommended to make sure your code can run in serial too, for testing.
            outputs['out_serial'] = g_y * np.sum(g_x)


    def compute_jacvec_product(self, inputs, d_inputs, d_outputs, mode):

        x = inputs['in_dist']
        y = inputs['in_serial']

        g_y = y**2 + 3.0*y - 5.0

        # "Computationally Intensive" operation that we wish to parallelize.
        f_x = x**2 - 2.0*x + 4.0 
        # These operations are repeated on all procs.
        f_y = y ** 0.5
        g_y = y**2 + 3.0*y - 5.0
        # Compute square root of our portion of the distributed input.
        g_x = x ** 0.5
        # Distributed output
        out_dist = f_x + f_y

        # Serial output
        if MPI and comm.size > 1:
            # We need to gather the summed values to compute the total sum over all procs.
            local_sum = np.array(np.sum(g_x))
            total_sum = local_sum.copy()
            self.comm.Allreduce(local_sum, total_sum, op=MPI.SUM)
            # total_sum
        else:
            # Recommended to make sure your code can run in serial too, for testing.
            total_sum = np.sum(g_x)

        num_x = len(x)

        d_f_x__d_x = np.diag(2*x - 2.)
        d_f_y__d_y = np.ones(num_x)*0.5*y**-0.5

        d_g_y__d_y = 2*y + 3.
        d_g_x__d_x = 0.5*x**-0.5

        d_out_dist__d_x =  d_f_x__d_x # square matrix
        d_out_dist__d_y =  d_f_y__d_y # num_x,1

        d_out_serial__d_y =  d_g_y__d_y # scalar
        d_out_serial__d_x =  g_y*d_g_x__d_x.reshape((1,num_x))

        if mode == 'fwd':
            if 'out_serial' in d_outputs:
                if 'in_dist' in d_inputs:
                    d_outputs['out_serial'] += d_out_serial__d_x.dot(d_inputs['in_dist'])
                if 'in_serial' in d_inputs:
                    d_outputs['out_serial'] += d_out_serial__d_y.dot(d_inputs['in_serial'])
        elif mode == 'rev':
            if 'out_serial' in d_outputs:
                if 'in_dist' in d_inputs:
                    d_inputs['in_dist'] += d_out_serial__d_x.T.dot(d_outputs['out_serial'])
                if 'in_serial' in d_inputs:
                    d_inputs['in_serial'] += total_sum*d_out_serial__d_y.T.dot(d_outputs['out_serial'])

size = 7
if MPI:
    comm = MPI.COMM_WORLD
    rank = comm.rank
    sizes, offsets = evenly_distrib_idxs(comm.size, size)
else:
    # When running in serial, the entire variable is on rank 0.
    rank = 0
    sizes = {rank : size}
    offsets = {rank : 0}

prob = om.Problem()
model = prob.model

# Create a distributed source for the distributed input.
ivc = om.IndepVarComp()
ivc.add_output('x_dist', np.zeros(sizes[rank]), distributed=True)
ivc.add_output('x_serial', val=1)

model.add_subsystem("indep", ivc)
model.add_subsystem("D1", MixedDistrib2())
model.add_subsystem('con_cmp1', om.ExecComp('con1 = y**2'), promotes=['con1', 'y'])

model.connect('indep.x_dist', 'D1.in_dist')
model.connect('indep.x_serial', ['D1.in_serial','y'])

prob.driver = om.ScipyOptimizeDriver()
prob.driver.options['optimizer'] = 'SLSQP'

model.add_design_var('indep.x_serial', lower=5, upper=10)
model.add_constraint('con1', upper=90)

model.add_objective('D1.out_serial')

prob.setup(force_alloc_complex=True)
#prob.setup()

# Set initial values of distributed variable.
x_dist_init = np.ones(sizes[rank])
prob.set_val('indep.x_dist', x_dist_init)

# Set initial values of serial variable.
prob.set_val('indep.x_serial', 10)

prob.run_model()

prob.check_partials()

# prob.run_driver()
print('x_dist', prob.get_val('indep.x_dist', get_remote=True))
print('x_serial', prob.get_val('indep.x_serial'))
print('Obj', prob.get_val('D1.out_serial'))