Dask:如何有效地分发遗传搜索算法?
Dask: How to efficiently distribute a genetic search algorithm?
我已经实现了遗传搜索算法并尝试对其进行并行处理,但性能很差(比单线程还差)。我怀疑这是由于通信开销造成的。
我在下面提供了伪代码,但本质上遗传算法创建了一个大型 "Chromosome" 对象池,然后 运行s 多次迭代:
- 根据每个染色体在 'world.' 中的表现对每个染色体进行评分。世界在迭代中保持静态。
- 根据上一步计算的得分随机选择一个新人群
- 转到步骤 1 进行 n 次迭代
评分算法(第 1 步)是主要瓶颈,因此将此代码的处理分配出去似乎很自然。
我 运行 遇到了几个问题,希望能得到帮助:
- 我如何 link 使用
map()
传递给评分函数的对象计算分数,即 link 每个 Future
持有一个分数返回到一个Chromosome
?我通过 calculate_scores()
方法 return 对象以一种非常笨拙的方式完成了此操作,但实际上我需要的是发送一个 float
返回,如果有更好的方法维护 link.
- 评分函数的并行处理工作正常,但
map()
需要很长时间才能遍历所有对象。但是,与单线程版本相比,随后对 draw_chromosome_from_pool()
运行 非常 的调用缓慢到我还没有看到它完成的程度。我不知道是什么原因造成的,因为该方法在单线程版本中总是很快完成。是否有一些 IPC 继续将染色体拉回本地进程,即使在所有期货都已完成之后?本地进程是否以某种方式降低了优先级?
- 我担心 building/rebuilding 池每个周期的整体迭代性质会导致向工作人员传输大量数据。这个问题的根本问题是:Dask 实际上 什么时候以及什么时候将数据来回发送到工作池。即,Environment() 与 Chromosome() 什么时候分发,结果 how/when 会返回吗?我已经阅读了 docs,但要么没有找到正确的细节,要么太愚蠢而无法理解。
理想情况下,我认为(但欢迎更正)我想要的是一个分布式架构,其中每个工作人员在 'permanent' 的基础上在本地保存 Environment()
数据,然后 Chromosome()
实例数据分布用于评分,迭代之间几乎没有重复 back/forth 未更改的 Chromosome() 数据。
很长 post,所以如果您花时间阅读本文,谢谢!
class Chromosome(object): # Small size: several hundred bytes per instance
def get_score():
# Returns a float
def set_score(i):
# Stores a a float
class Environment(object): # Large size: 20-50Mb per instance, but only one instance
def calculate_scores(chromosome):
# Slow calculation using attributes from chromosome and instance data
chromosome.set_score(x)
return chromosome
class Evolver(object):
def draw_chromosome_from_pool(self, max_score):
while True:
individual = np.random.choice(self.chromosome_pool)
selection_chance = np.random.uniform()
if selection_chance < individual.get_score() / max_score:
return individual
def run_evolution()
self.dask_client = Client()
self.chromosome_pool = list()
for i in range(10000):
self.chromosome_pool.append( Chromosome() )
world_data = LoadWorldData() # Returns a pandas Dataframe
self.world = Environment(world_data)
iterations = 1000
for i in range(iterations):
futures = self.dask_client.map(self.world.calculate_scores, self.chromosome_pool)
for future in as_completed(futures):
c = future.result()
highest_score = max(highest_score, c.get_score())
new_pool = set()
while len(new_pool)<self.pool_size:
mother = self.draw_chromosome_from_pool(highest_score)
# do stuff to build a new pool
是的,每次你拨打电话线
futures = self.dask_client.map(self.world.calculate_scores, self.chromosome_pool)
您正在连载 self.world
,它很大。您可以在
循环之前只执行一次此操作
future_world = client.scatter(self.world, broadcast=True)
然后循环
futures = self.dask_client.map(lambda ch: Environment.calculate_scores(future_world, ch), self.chromosome_pool)
将使用 worker 上已有的副本(或执行相同操作的简单函数)。关键是 future_world
只是一个指向已经分发的东西的指针,但是 dask 会为你处理这个。
关于哪个染色体是哪个的问题:使用 as_completed
会破坏您将它们提交给 map
的顺序,但这对您的代码来说不是必需的。您可以使用 wait
来处理所有工作何时完成,或者简单地迭代 future.result()
s(这将等待每个任务完成),然后您将保留顺序chromosome_pool.
我已经实现了遗传搜索算法并尝试对其进行并行处理,但性能很差(比单线程还差)。我怀疑这是由于通信开销造成的。
我在下面提供了伪代码,但本质上遗传算法创建了一个大型 "Chromosome" 对象池,然后 运行s 多次迭代:
- 根据每个染色体在 'world.' 中的表现对每个染色体进行评分。世界在迭代中保持静态。
- 根据上一步计算的得分随机选择一个新人群
- 转到步骤 1 进行 n 次迭代
评分算法(第 1 步)是主要瓶颈,因此将此代码的处理分配出去似乎很自然。
我 运行 遇到了几个问题,希望能得到帮助:
- 我如何 link 使用
map()
传递给评分函数的对象计算分数,即 link 每个Future
持有一个分数返回到一个Chromosome
?我通过calculate_scores()
方法 return 对象以一种非常笨拙的方式完成了此操作,但实际上我需要的是发送一个float
返回,如果有更好的方法维护 link. - 评分函数的并行处理工作正常,但
map()
需要很长时间才能遍历所有对象。但是,与单线程版本相比,随后对draw_chromosome_from_pool()
运行 非常 的调用缓慢到我还没有看到它完成的程度。我不知道是什么原因造成的,因为该方法在单线程版本中总是很快完成。是否有一些 IPC 继续将染色体拉回本地进程,即使在所有期货都已完成之后?本地进程是否以某种方式降低了优先级? - 我担心 building/rebuilding 池每个周期的整体迭代性质会导致向工作人员传输大量数据。这个问题的根本问题是:Dask 实际上 什么时候以及什么时候将数据来回发送到工作池。即,Environment() 与 Chromosome() 什么时候分发,结果 how/when 会返回吗?我已经阅读了 docs,但要么没有找到正确的细节,要么太愚蠢而无法理解。
理想情况下,我认为(但欢迎更正)我想要的是一个分布式架构,其中每个工作人员在 'permanent' 的基础上在本地保存 Environment()
数据,然后 Chromosome()
实例数据分布用于评分,迭代之间几乎没有重复 back/forth 未更改的 Chromosome() 数据。
很长 post,所以如果您花时间阅读本文,谢谢!
class Chromosome(object): # Small size: several hundred bytes per instance
def get_score():
# Returns a float
def set_score(i):
# Stores a a float
class Environment(object): # Large size: 20-50Mb per instance, but only one instance
def calculate_scores(chromosome):
# Slow calculation using attributes from chromosome and instance data
chromosome.set_score(x)
return chromosome
class Evolver(object):
def draw_chromosome_from_pool(self, max_score):
while True:
individual = np.random.choice(self.chromosome_pool)
selection_chance = np.random.uniform()
if selection_chance < individual.get_score() / max_score:
return individual
def run_evolution()
self.dask_client = Client()
self.chromosome_pool = list()
for i in range(10000):
self.chromosome_pool.append( Chromosome() )
world_data = LoadWorldData() # Returns a pandas Dataframe
self.world = Environment(world_data)
iterations = 1000
for i in range(iterations):
futures = self.dask_client.map(self.world.calculate_scores, self.chromosome_pool)
for future in as_completed(futures):
c = future.result()
highest_score = max(highest_score, c.get_score())
new_pool = set()
while len(new_pool)<self.pool_size:
mother = self.draw_chromosome_from_pool(highest_score)
# do stuff to build a new pool
是的,每次你拨打电话线
futures = self.dask_client.map(self.world.calculate_scores, self.chromosome_pool)
您正在连载 self.world
,它很大。您可以在
future_world = client.scatter(self.world, broadcast=True)
然后循环
futures = self.dask_client.map(lambda ch: Environment.calculate_scores(future_world, ch), self.chromosome_pool)
将使用 worker 上已有的副本(或执行相同操作的简单函数)。关键是 future_world
只是一个指向已经分发的东西的指针,但是 dask 会为你处理这个。
关于哪个染色体是哪个的问题:使用 as_completed
会破坏您将它们提交给 map
的顺序,但这对您的代码来说不是必需的。您可以使用 wait
来处理所有工作何时完成,或者简单地迭代 future.result()
s(这将等待每个任务完成),然后您将保留顺序chromosome_pool.