遗传算法——在具有未绑定方法的 class 内部使用多处理
Genetic algorithms -- Using multiprocessing inside of a class having an unbound method
我最近一直在尝试通过求助于 class 本身内部的 multiprocessing
库来并行化我的一些代码(为了速度)是用户提供的函数,保存在 class 属性中)。根本没用。
上下文:我正在尝试并行化“并行遗传算法”class,顾名思义,它本身就是一个令人尴尬的并行问题。
据我所知,我的代码中存在 2 个问题。 (1) 用户提供的适应度函数不会在 Pool
对象生成的过程中导出,而且似乎无法解决任何深度复制问题。 (2) 另一个问题是,也许 Pool
对象不确定如何处理多维输出...我真的不确定这个 tbh。
我已经尝试编写我的代码的一个小型独立版本,只是为了让事情更清楚(目前 运行,因为存在错误,这就是重点) :
from itertools import repeat
import multiprocessing as mp
import numpy as np
class GeneticAlgorithm:
def __init__(self, I, G, D, U, fitness_function, run_parallel):
self.fitness_function = fitness_function # User-supplied fitness function
self.D = D # Problem size (number of genes)
self.I = I # Population size (number of individuals)
self.G = G # Number of generations
self.U = U # Number of parallel populations
self.run_parallel = run_parallel
def sga(self):
'''One single-threaded genetic algorithm'''
# Activation rate is fixed at 0.5 for the sake of this MWE
pop = np.random.binomial(size=self.I * self.D, n=1, p=0.5).reshape(self.I, self.D) # Seed population
for g in range(self.G):
# fitness is computed for all individuals
fitpop = np.array([self.fitness_function(ind=ind) for ind in pop])
# fitness is scaled back to 100%
fitpop /= np.sum(fitpop)
# 2I parents are selected at random according to each individual's relative fitness score
parents = np.random.choice(range(self.I), size=2 * self.I, replace=True, p=fitpop).reshape(self.I, 2)
# Parents are crossed 2 by 2, with each pair producing exactly one offspring
crossover = np.array([np.random.choice(parents[i, :], size=self.D, replace=True) for i in range(self.I)]).reshape(self.I, self.D)
embryos = np.array([[pop[crossover[i, d], d] for d in range(self.D)] for i in range(self.I)]).reshape(self.I, self.D)
# Mutation rate is fixed at 1/D for the sake of this MWE
mutations = np.random.binomial(size=self.I * self.D, n=1, p=1 / self.D).reshape(self.I, self.D)
# "Mutated embryos" become fully fledged individuals and replace the parent generation
pop = (1 - mutations) * embryos + mutations * (1 - embryos)
# Individuals are aggregated gene-wise, with the average of active and inactive genes creating a ratio
return pop.mean(axis=0)
def pga(self):
'''Multiple parallel genetic algorithms'''
if self.run_parallel:
p = mp.Pool(mp.cpu_count())
universes = p.starmap(GeneticAlgorithm.sga, zip(repeat(self, self.U)))
p.close()
p.join()
else:
universes = np.zeros(self.U * self.D).reshape(self.U, self.D)
for u in range(self.U):
universes[u, :] = self.sga()
# Multiple GAs are aggregated in a sort of "mean of means"
return universes.mean(axis=0)
if __name__ == '__main__':
def my_fitness_function(ind):
'''Dummy fitness function, scores all individual equally...'''
return 1.0
# Dummy test to check if the code runs... it doesn't :(
ga = GeneticAlgorithm(I=10, G=3, D=5, U=10, fitness_function=my_fitness_function, run_parallel=True)
print(ga.pga())
任何类型的提示、代码片段或完整的解决方案都将受到欢迎。这在 R 中曾经相当容易,但是 Python 我显然已经无计可施了……谢谢!
ETA:修正了代码中的几个拼写错误并添加了 run_parallel
参数以表明它 运行 完全没问题 没有 并行化。哦,是的,还有,我 运行 在 Windows 上,否则我可能会尝试那个 Ray
库,有人告诉我,它可以创造奇迹,尤其是与 multiprocessing
相比时。
请参阅我对您的评论 post。此外,池中的每个进程都应该唯一地播种随机数生成器。否则,它们将生成相同的随机数序列。
请注意,由于创建进程池以及将参数和结果从一个进程的地址 space 传递到另一个地址 space 的开销,多处理不一定会更快 运行。 您的工作函数 sga
必须足够 CPU 密集才能使多处理具有优势,我现在明白实际上它是 。
from itertools import repeat
import multiprocessing as mp
import numpy as np
# this will be executed by each process in the pool:
def init_pool():
from threading import current_thread
ident = current_thread().ident
np.random.seed(ident)
def my_fitness_function(ind):
'''Dummy fitness function, scores all individual equally...'''
return 1.0
class GeneticAlgorithm:
def __init__(self, I, G, D, U, fitness_function, run_parallel):
self.fitness_function = fitness_function # User-supplied fitness function
self.D = D # Problem size (number of genes)
self.I = I # Population size (number of individuals)
self.G = G # Number of generations
self.U = U # Number of parallel populations
self.run_parallel = run_parallel
def sga(self):
'''One single-threaded genetic algorithm'''
# Activation rate is fixed at 0.5 for the sake of this MWE
pop = np.random.binomial(size=self.I * self.D, n=1, p=0.5).reshape(self.I, self.D) # Seed population
for g in range(self.G):
# fitness is computed for all individuals
fitpop = np.array([self.fitness_function(ind=ind) for ind in pop])
# fitness is scaled back to 100%
fitpop /= np.sum(fitpop)
# 2I parents are selected at random according to each individual's relative fitness score
parents = np.random.choice(range(self.I), size=2 * self.I, replace=True, p=fitpop).reshape(self.I, 2)
# Parents are crossed 2 by 2, with each pair producing exactly one offspring
crossover = np.array([np.random.choice(parents[i, :], size=self.D, replace=True) for i in range(self.I)]).reshape(self.I, self.D)
embryos = np.array([[pop[crossover[i, d], d] for d in range(self.D)] for i in range(self.I)]).reshape(self.I, self.D)
# Mutation rate is fixed at 1/D for the sake of this MWE
mutations = np.random.binomial(size=self.I * self.D, n=1, p=1 / self.D).reshape(self.I, self.D)
# "Mutated embryos" become fully fledged individuals and replace the parent generation
pop = (1 - mutations) * embryos + mutations * (1 - embryos)
# Individuals are aggregated gene-wise, with the average of active and inactive genes creating a ratio
return pop.mean(axis=0)
def pga(self):
'''Multiple parallel genetic algorithms'''
universes = np.zeros(self.U * self.D).reshape(self.U, self.D)
if self.run_parallel:
pool_size = min(mp.cpu_count(), self.U)
p = mp.Pool(pool_size, initializer=init_pool)
results = p.starmap(GeneticAlgorithm.sga, zip(repeat(self, self.U)))
p.close()
p.join()
for u in range(self.U):
universes[u, :] = results[u]
else:
for u in range(self.U):
universes[u, :] = self.sga()
# Multiple GAs are aggregated in a sort of "mean of means"
return universes.mean(axis=0)
if __name__ == '__main__':
# Dummy test to check if the code runs... it doesn't :(
ga = GeneticAlgorithm(I=10, G=3, D=5, U=10, fitness_function=my_fitness_function, run_parallel=True)
print(ga.pga())
打印:
[0.56 0.46 0.38 0.54 0.52]
我最近一直在尝试通过求助于 class 本身内部的 multiprocessing
库来并行化我的一些代码(为了速度)是用户提供的函数,保存在 class 属性中)。根本没用。
上下文:我正在尝试并行化“并行遗传算法”class,顾名思义,它本身就是一个令人尴尬的并行问题。
据我所知,我的代码中存在 2 个问题。 (1) 用户提供的适应度函数不会在 Pool
对象生成的过程中导出,而且似乎无法解决任何深度复制问题。 (2) 另一个问题是,也许 Pool
对象不确定如何处理多维输出...我真的不确定这个 tbh。
我已经尝试编写我的代码的一个小型独立版本,只是为了让事情更清楚(目前 运行,因为存在错误,这就是重点) :
from itertools import repeat
import multiprocessing as mp
import numpy as np
class GeneticAlgorithm:
def __init__(self, I, G, D, U, fitness_function, run_parallel):
self.fitness_function = fitness_function # User-supplied fitness function
self.D = D # Problem size (number of genes)
self.I = I # Population size (number of individuals)
self.G = G # Number of generations
self.U = U # Number of parallel populations
self.run_parallel = run_parallel
def sga(self):
'''One single-threaded genetic algorithm'''
# Activation rate is fixed at 0.5 for the sake of this MWE
pop = np.random.binomial(size=self.I * self.D, n=1, p=0.5).reshape(self.I, self.D) # Seed population
for g in range(self.G):
# fitness is computed for all individuals
fitpop = np.array([self.fitness_function(ind=ind) for ind in pop])
# fitness is scaled back to 100%
fitpop /= np.sum(fitpop)
# 2I parents are selected at random according to each individual's relative fitness score
parents = np.random.choice(range(self.I), size=2 * self.I, replace=True, p=fitpop).reshape(self.I, 2)
# Parents are crossed 2 by 2, with each pair producing exactly one offspring
crossover = np.array([np.random.choice(parents[i, :], size=self.D, replace=True) for i in range(self.I)]).reshape(self.I, self.D)
embryos = np.array([[pop[crossover[i, d], d] for d in range(self.D)] for i in range(self.I)]).reshape(self.I, self.D)
# Mutation rate is fixed at 1/D for the sake of this MWE
mutations = np.random.binomial(size=self.I * self.D, n=1, p=1 / self.D).reshape(self.I, self.D)
# "Mutated embryos" become fully fledged individuals and replace the parent generation
pop = (1 - mutations) * embryos + mutations * (1 - embryos)
# Individuals are aggregated gene-wise, with the average of active and inactive genes creating a ratio
return pop.mean(axis=0)
def pga(self):
'''Multiple parallel genetic algorithms'''
if self.run_parallel:
p = mp.Pool(mp.cpu_count())
universes = p.starmap(GeneticAlgorithm.sga, zip(repeat(self, self.U)))
p.close()
p.join()
else:
universes = np.zeros(self.U * self.D).reshape(self.U, self.D)
for u in range(self.U):
universes[u, :] = self.sga()
# Multiple GAs are aggregated in a sort of "mean of means"
return universes.mean(axis=0)
if __name__ == '__main__':
def my_fitness_function(ind):
'''Dummy fitness function, scores all individual equally...'''
return 1.0
# Dummy test to check if the code runs... it doesn't :(
ga = GeneticAlgorithm(I=10, G=3, D=5, U=10, fitness_function=my_fitness_function, run_parallel=True)
print(ga.pga())
任何类型的提示、代码片段或完整的解决方案都将受到欢迎。这在 R 中曾经相当容易,但是 Python 我显然已经无计可施了……谢谢!
ETA:修正了代码中的几个拼写错误并添加了 run_parallel
参数以表明它 运行 完全没问题 没有 并行化。哦,是的,还有,我 运行 在 Windows 上,否则我可能会尝试那个 Ray
库,有人告诉我,它可以创造奇迹,尤其是与 multiprocessing
相比时。
请参阅我对您的评论 post。此外,池中的每个进程都应该唯一地播种随机数生成器。否则,它们将生成相同的随机数序列。
请注意,由于创建进程池以及将参数和结果从一个进程的地址 space 传递到另一个地址 space 的开销,多处理不一定会更快 运行。 您的工作函数 sga
必须足够 CPU 密集才能使多处理具有优势,我现在明白实际上它是 。
from itertools import repeat
import multiprocessing as mp
import numpy as np
# this will be executed by each process in the pool:
def init_pool():
from threading import current_thread
ident = current_thread().ident
np.random.seed(ident)
def my_fitness_function(ind):
'''Dummy fitness function, scores all individual equally...'''
return 1.0
class GeneticAlgorithm:
def __init__(self, I, G, D, U, fitness_function, run_parallel):
self.fitness_function = fitness_function # User-supplied fitness function
self.D = D # Problem size (number of genes)
self.I = I # Population size (number of individuals)
self.G = G # Number of generations
self.U = U # Number of parallel populations
self.run_parallel = run_parallel
def sga(self):
'''One single-threaded genetic algorithm'''
# Activation rate is fixed at 0.5 for the sake of this MWE
pop = np.random.binomial(size=self.I * self.D, n=1, p=0.5).reshape(self.I, self.D) # Seed population
for g in range(self.G):
# fitness is computed for all individuals
fitpop = np.array([self.fitness_function(ind=ind) for ind in pop])
# fitness is scaled back to 100%
fitpop /= np.sum(fitpop)
# 2I parents are selected at random according to each individual's relative fitness score
parents = np.random.choice(range(self.I), size=2 * self.I, replace=True, p=fitpop).reshape(self.I, 2)
# Parents are crossed 2 by 2, with each pair producing exactly one offspring
crossover = np.array([np.random.choice(parents[i, :], size=self.D, replace=True) for i in range(self.I)]).reshape(self.I, self.D)
embryos = np.array([[pop[crossover[i, d], d] for d in range(self.D)] for i in range(self.I)]).reshape(self.I, self.D)
# Mutation rate is fixed at 1/D for the sake of this MWE
mutations = np.random.binomial(size=self.I * self.D, n=1, p=1 / self.D).reshape(self.I, self.D)
# "Mutated embryos" become fully fledged individuals and replace the parent generation
pop = (1 - mutations) * embryos + mutations * (1 - embryos)
# Individuals are aggregated gene-wise, with the average of active and inactive genes creating a ratio
return pop.mean(axis=0)
def pga(self):
'''Multiple parallel genetic algorithms'''
universes = np.zeros(self.U * self.D).reshape(self.U, self.D)
if self.run_parallel:
pool_size = min(mp.cpu_count(), self.U)
p = mp.Pool(pool_size, initializer=init_pool)
results = p.starmap(GeneticAlgorithm.sga, zip(repeat(self, self.U)))
p.close()
p.join()
for u in range(self.U):
universes[u, :] = results[u]
else:
for u in range(self.U):
universes[u, :] = self.sga()
# Multiple GAs are aggregated in a sort of "mean of means"
return universes.mean(axis=0)
if __name__ == '__main__':
# Dummy test to check if the code runs... it doesn't :(
ga = GeneticAlgorithm(I=10, G=3, D=5, U=10, fitness_function=my_fitness_function, run_parallel=True)
print(ga.pga())
打印:
[0.56 0.46 0.38 0.54 0.52]