遗传算法——在具有未绑定方法的 class 内部使用多处理

Genetic algorithms -- Using multiprocessing inside of a class having an unbound method

我最近一直在尝试通过求助于 class 本身内部的 multiprocessing 库来并行化我的一些代码(为了速度)是用户提供的函数,保存在 class 属性中)。根本没用。

上下文:我正在尝试并行化“并行遗传算法”class,顾名思义,它本身就是一个令人尴尬的并行问题。

据我所知,我的代码中存在 2 个问题。 (1) 用户提供的适应度函数不会在 Pool 对象生成的过程中导出,而且似乎无法解决任何深度复制问题。 (2) 另一个问题是,也许 Pool 对象不确定如何处理多维输出...我真的不确定这个 tbh。

我已经尝试编写我的代码的一个小型独立版本,只是为了让事情更清楚(目前 运行,因为存在错误,这就是重点) :

from itertools import repeat
import multiprocessing as mp
import numpy as np

class GeneticAlgorithm:
    def __init__(self, I, G, D, U, fitness_function, run_parallel):
        self.fitness_function = fitness_function # User-supplied fitness function
        self.D = D # Problem size (number of genes)
        self.I = I # Population size (number of individuals)
        self.G = G # Number of generations
        self.U = U # Number of parallel populations
        self.run_parallel = run_parallel

    def sga(self):
        '''One single-threaded genetic algorithm'''
        # Activation rate is fixed at 0.5 for the sake of this MWE
        pop = np.random.binomial(size=self.I * self.D, n=1, p=0.5).reshape(self.I, self.D) # Seed population

        for g in range(self.G):
            # fitness is computed for all individuals
            fitpop = np.array([self.fitness_function(ind=ind) for ind in pop])
            # fitness is scaled back to 100%
            fitpop /= np.sum(fitpop)
            # 2I parents are selected at random according to each individual's relative fitness score
            parents = np.random.choice(range(self.I), size=2 * self.I, replace=True, p=fitpop).reshape(self.I, 2)
            # Parents are crossed 2 by 2, with each pair producing exactly one offspring
            crossover = np.array([np.random.choice(parents[i, :], size=self.D, replace=True) for i in range(self.I)]).reshape(self.I, self.D)
            embryos = np.array([[pop[crossover[i, d], d] for d in range(self.D)] for i in range(self.I)]).reshape(self.I, self.D)
            # Mutation rate is fixed at 1/D for the sake of this MWE
            mutations = np.random.binomial(size=self.I * self.D, n=1, p=1 / self.D).reshape(self.I, self.D)
            # "Mutated embryos" become fully fledged individuals and replace the parent generation
            pop = (1 - mutations) * embryos + mutations * (1 - embryos)

        # Individuals are aggregated gene-wise, with the average of active and inactive genes creating a ratio
        return pop.mean(axis=0)

    def pga(self):
        '''Multiple parallel genetic algorithms'''
        if self.run_parallel:
            p = mp.Pool(mp.cpu_count())
            universes = p.starmap(GeneticAlgorithm.sga, zip(repeat(self, self.U)))
            p.close()
            p.join()
        else:
            universes = np.zeros(self.U * self.D).reshape(self.U, self.D)
            
            for u in range(self.U):
                universes[u, :] = self.sga()

        # Multiple GAs are aggregated in a sort of "mean of means"
        return universes.mean(axis=0)

if __name__ == '__main__':
    def my_fitness_function(ind):
        '''Dummy fitness function, scores all individual equally...'''
        return 1.0

    # Dummy test to check if the code runs... it doesn't :(
    ga = GeneticAlgorithm(I=10, G=3, D=5, U=10, fitness_function=my_fitness_function, run_parallel=True)
    print(ga.pga())

任何类型的提示、代码片段或完整的解决方案都将受到欢迎。这在 R 中曾经相当容易,但是 Python 我显然已经无计可施了……谢谢!

ETA:修正了代码中的几个拼写错误并添加了 run_parallel 参数以表明它 运行 完全没问题 没有 并行化。哦,是的,还有,我 运行 在 Windows 上,否则我可能会尝试那个 Ray 库,有人告诉我,它可以创造奇迹,尤其是与 multiprocessing 相比时。

请参阅我对您的评论 post。此外,池中的每个进程都应该唯一地播种随机数生成器。否则,它们将生成相同的随机数序列。

请注意,由于创建进程池以及将参数和结果从一个进程的地址 space 传递到另一个地址 space 的开销,多处理不一定会更快 运行。 您的工作函数 sga 必须足够 CPU 密集才能使多处理具有优势,我现在明白实际上它是

from itertools import repeat
import multiprocessing as mp
import numpy as np

# this will be executed by each process in the pool:
def init_pool():
    from threading import current_thread
    ident = current_thread().ident
    np.random.seed(ident)

def my_fitness_function(ind):
    '''Dummy fitness function, scores all individual equally...'''
    return 1.0

class GeneticAlgorithm:
    def __init__(self, I, G, D, U, fitness_function, run_parallel):
        self.fitness_function = fitness_function # User-supplied fitness function
        self.D = D # Problem size (number of genes)
        self.I = I # Population size (number of individuals)
        self.G = G # Number of generations
        self.U = U # Number of parallel populations
        self.run_parallel = run_parallel

    def sga(self):
        '''One single-threaded genetic algorithm'''
        # Activation rate is fixed at 0.5 for the sake of this MWE
        pop = np.random.binomial(size=self.I * self.D, n=1, p=0.5).reshape(self.I, self.D) # Seed population

        for g in range(self.G):
            # fitness is computed for all individuals
            fitpop = np.array([self.fitness_function(ind=ind) for ind in pop])
            # fitness is scaled back to 100%
            fitpop /= np.sum(fitpop)
            # 2I parents are selected at random according to each individual's relative fitness score
            parents = np.random.choice(range(self.I), size=2 * self.I, replace=True, p=fitpop).reshape(self.I, 2)
            # Parents are crossed 2 by 2, with each pair producing exactly one offspring
            crossover = np.array([np.random.choice(parents[i, :], size=self.D, replace=True) for i in range(self.I)]).reshape(self.I, self.D)
            embryos = np.array([[pop[crossover[i, d], d] for d in range(self.D)] for i in range(self.I)]).reshape(self.I, self.D)
            # Mutation rate is fixed at 1/D for the sake of this MWE
            mutations = np.random.binomial(size=self.I * self.D, n=1, p=1 / self.D).reshape(self.I, self.D)
            # "Mutated embryos" become fully fledged individuals and replace the parent generation
            pop = (1 - mutations) * embryos + mutations * (1 - embryos)

        # Individuals are aggregated gene-wise, with the average of active and inactive genes creating a ratio
        return pop.mean(axis=0)

    def pga(self):
        '''Multiple parallel genetic algorithms'''
        universes = np.zeros(self.U * self.D).reshape(self.U, self.D)
        if self.run_parallel:
            pool_size = min(mp.cpu_count(), self.U)
            p = mp.Pool(pool_size, initializer=init_pool)
            results = p.starmap(GeneticAlgorithm.sga, zip(repeat(self, self.U)))
            p.close()
            p.join()
            for u in range(self.U):
                universes[u, :] = results[u]
        else:
            for u in range(self.U):
                universes[u, :] = self.sga()

        # Multiple GAs are aggregated in a sort of "mean of means"
        return universes.mean(axis=0)

if __name__ == '__main__':
    # Dummy test to check if the code runs... it doesn't :(
    ga = GeneticAlgorithm(I=10, G=3, D=5, U=10, fitness_function=my_fitness_function, run_parallel=True)
    print(ga.pga())

打印:

[0.56 0.46 0.38 0.54 0.52]