Dask:如何有效地分发遗传搜索算法?

Dask: How to efficiently distribute a genetic search algorithm?

我已经实现了遗传搜索算法并尝试对其进行并行处理,但性能很差(比单线程还差)。我怀疑这是由于通信开销造成的。

我在下面提供了伪代码,但本质上遗传算法创建了一个大型 "Chromosome" 对象池,然后 运行s 多次迭代:

  1. 根据每个染色体在 'world.' 中的表现对每个染色体进行评分。世界在迭代中保持静态。
  2. 根据上一步计算的得分随机选择一个新人群
  3. 转到步骤 1 进行 n 次迭代

评分算法(第 1 步)是主要瓶颈,因此将此代码的处理分配出去似乎很自然。

我 运行 遇到了几个问题,希望能得到帮助:

  1. 我如何 link 使用 map() 传递给评分函数的对象计算分数,即 link 每个 Future 持有一个分数返回到一个Chromosome?我通过 calculate_scores() 方法 return 对象以一种非常笨拙的方式完成了此操作,但实际上我需要的是发送一个 float 返回,如果有更好的方法维护 link.
  2. 评分函数的并行处理工作正常,但 map() 需要很长时间才能遍历所有对象。但是,与单线程版本相比,随后对 draw_chromosome_from_pool() 运行 非常 的调用缓慢到我还没有看到它完成的程度。我不知道是什么原因造成的,因为该方法在单线程版本中总是很快完成。是否有一些 IPC 继续将染色体拉回本地进程,即使在所有期货都已完成之后?本地进程是否以某种方式降低了优先级?
  3. 我担心 building/rebuilding 池每个周期的整体迭代性质会导致向工作人员传输大量数据。这个问题的根本问题是:Dask 实际上 什么时候以及什么时候将数据来回发送到工作池。即,Environment() 与 Chromosome() 什么时候分发,结果 how/when 会返回吗?我已经阅读了 docs,但要么没有找到正确的细节,要么太愚蠢而无法理解。

理想情况下,我认为(但欢迎更正)我想要的是一个分布式架构,其中每个工作人员在 'permanent' 的基础上在本地保存 Environment() 数据,然后 Chromosome() 实例数据分布用于评分,迭代之间几乎没有重复 back/forth 未更改的 Chromosome() 数据。

很长 post,所以如果您花时间阅读本文,谢谢!

class Chromosome(object):    # Small size: several hundred bytes per instance 
     def get_score():
          # Returns a float
     def set_score(i):
          # Stores a a float

class Environment(object):   # Large size: 20-50Mb per instance, but only one instance
         def calculate_scores(chromosome):
             # Slow calculation using attributes from chromosome and instance data
             chromosome.set_score(x)
             return chromosome

class Evolver(object):
    def draw_chromosome_from_pool(self, max_score):
        while True:
            individual = np.random.choice(self.chromosome_pool)
            selection_chance = np.random.uniform()
            if selection_chance < individual.get_score() / max_score:
                return individual   

    def run_evolution()
         self.dask_client = Client()
         self.chromosome_pool = list()
         for i in range(10000):
             self.chromosome_pool.append( Chromosome() )

         world_data = LoadWorldData() # Returns a pandas Dataframe
         self.world = Environment(world_data)

         iterations = 1000
         for i in range(iterations):
             futures = self.dask_client.map(self.world.calculate_scores, self.chromosome_pool)
             for future in as_completed(futures):
                  c = future.result()
                  highest_score = max(highest_score, c.get_score()) 

             new_pool = set()
             while len(new_pool)<self.pool_size:
                 mother = self.draw_chromosome_from_pool(highest_score)
                  # do stuff to build a new pool

是的,每次你拨打电话线

futures = self.dask_client.map(self.world.calculate_scores, self.chromosome_pool)

您正在连载 self.world,它很大。您可以在

循环之前只执行一次此操作
future_world = client.scatter(self.world, broadcast=True)

然后循环

futures = self.dask_client.map(lambda ch: Environment.calculate_scores(future_world, ch), self.chromosome_pool)

将使用 worker 上已有的副本(或执行相同操作的简单函数)。关键是 future_world 只是一个指向已经分发的东西的指针,但是 dask 会为你处理这个。

关于哪个染色体是哪个的问题:使用 as_completed 会破坏您将它们提交给 map 的顺序,但这对您的代码来说不是必需的。您可以使用 wait 来处理所有工作何时完成,或者简单地迭代 future.result()s(这将等待每个任务完成),然后您将保留顺序chromosome_pool.