在 python 中使用具有并行性的 Pygame

Using Pygame with parallelism in python

我正在尝试训练神经网络来玩使用 Pygame 制作的 SMB1 游戏。为了加快速度,我想使用并行处理,以便由不同的人口成员(以及不同的训练数据)同时玩游戏的多个副本。

我的问题的根源在于 Pygame 本身并不是基于实例的;也就是说,它只会生成一个 window,带有一个显示对象。因为我无法为每个进程创建多个 Pygame windows 和显示对象,所以进程必须共享一个显示对象。这引出了我的第一个问题:有没有办法让 pygame 有多个实例,如果没有,是否有一种(性能轻的)方法可以同时在显示器上绘图? a.k.a。每场比赛抽到整体的不同部分 window.

但是,我并不是真的需要渲染每个游戏;我只关心至少一个游戏实例被渲染,这样我就可以监控它的进度。我的解决方案是为每个游戏分配一个进程 ID,只有进程 ID 为 0 的游戏才会实际绘制到显示器上。并发问题解决了! 为此,我使用了 multiprocessing.Process:

processes = [];
genome_batches = np.array_split(genomes,self.runConfig.parallel_processes);
for i in range(runConfig.parallel_processes):
    process = multiprocessing.Process(target=self.eval_genome_batch_feedforward,args=(genome_batches[i],config,i));
    processes.append(process);
for process in processes:
    process.start();
for process in processes:
    process.join();

但是,这在多处理腌制对象时引起了它自己的问题: AttributeError: Can't pickle local object 'RunnerConfig.__init__.<locals>.<lambda>' 注意:config 和 RunnerConfig 是两个不同的东西;一个来自我正在使用的简洁库,这是传递给函数的配置,另一个是我自己的 class,它是 class 的 属性,进程从中获取正在开始。

经过一些研究,似乎因为我使用的是 class 方法,所以多处理会腌制 class,其中包括上述 RunnerConfig,其中包含不可腌制的 lambda 函数。这很难解决,因为这些 lambda 函数专门用于 in self.eval_genome_batch。这就引出了第二个问题:是否有可能以不需要腌制外部 class 的方式使用 multiprocessing.Process,这样 lambda 函数就不会被腌制?

最后,经过更多研究,事实证明我可以使用 pathos.multiprocessing,而不是使用 pickle 的多处理,它使用 dill。 Dill 可以 pickle lambda 函数。万岁!

但是没有。还有最后一个他妈的。 Pathos.multiprocessing 只有来自多处理的 .map 和 .map 等效函数,这不允许我控制进程本身。这意味着当函数被调用时,没有办法 (afaik) 告诉程序游戏是从哪个进程 ID 运行 来的,因此也无法告诉程序是否要渲染到屏幕上。所以最后一个问题是:有没有一种方法可以使用 pathos.multiprocessing.map(或者,实际上,任何库并行函数)a) 不会破坏 lambda 函数并且 b) 可以告诉被调用的函数正在使用进程 ID?

最后说明:我知道最简单的答案就是不渲染到 Pygame。那会解决所有问题。但是,能够看到程序的进步和学习对我来说是非常有用和重要的。

所以,我有一个不同问题的列表,其中任何一个,如果解决了,都会解决所有问题:

编辑:这是大部分相关代码。只包括了相关的方法,因为 classes 很长。如果您愿意,可以在 my github

找到源代码

game_runner_neat.py:开始并行处理的class

import neat
import baseGame
import runnerConfiguration
import os.path
import os
import visualize
import random
import numpy as np
#import concurrent.futures
import multiprocessing
from logReporting import LoggingReporter
from renderer import Renderer as RendererReporter
from videofig import videofig as vidfig
from neat.six_util import iteritems, itervalues

class GameRunner:

    #if using default version, create basic runner and specify game to run
    def __init__(self,game,runnerConfig):
        self.game = game;
        self.runConfig = runnerConfig;

    #skip some code


    #parallel version of eval_genomes_feedforward
    def eval_genome_batch_feedforward(self,genomes,config,processNum):
        for genome_id, genome in genomes:
            genome.fitness += self.eval_genome_feedforward(genome,config,processNum=processNum);

    
    def eval_training_data_batch_feedforward(self,genomes,config,data,processNum,lock):
        for datum in data:
            for genome_id,genome in genomes:
                genome.increment_fitness(lock,self.eval_genome_feedforward(genome,config,processNum=processNum,trainingDatum=datum)); #increment_fitness allows multiple threads to change the fitness of the same genome safely

    #evaluate a population with the game as a feedforward neural net
    def eval_genomes_feedforward(self, genomes, config):
        for genome_id,genome in genomes:
            genome.fitness = 0; #sanity check
        if (self.runConfig.training_data is None):
            if (self.runConfig.parallel):
                processes = [];
                genome_batches = np.array_split(genomes,self.runConfig.parallel_processes);
                for i in range(runConfig.parallel_processes):
                    process = multiprocessing.Process(target=self.eval_genome_batch_feedforward,args=(genome_batches[i],config,i));
                    processes.append(process);
                for process in processes:
                    process.start();
                for process in processes:
                    process.join();
                return;
            else:
                for genome_id, genome in genomes:
                    genome.fitness += self.eval_genome_feedforward(genome,config)
        else:
            if (self.runConfig.parallel):
                processes = [];
                data_batches = np.array_split(self.runConfig.training_data,self.runConfig.parallel_processes);
                lock = multiprocessing.Lock();
                for i in range(self.runConfig.parallel_processes):
                    process = multiprocessing.Process(target=self.eval_training_data_batch_feedforward,args=(genomes,config,data_batches[i],i,lock));
                    processes.append(process);
                    process.start();
                for process in processes:
                    process.join();
                return;
            else:
                for datum in self.runConfig.training_data:
                    for genome_id, genome in genomes:
                        genome.fitness += self.eval_genome_feedforward(genome,config,trainingDatum=datum)

runnerConfiguration.py(带有 lambda 函数的 class,在 init 中传递给 GameRunner):

class RunnerConfig:

    def __init__(self,gameFitnessFunction,gameRunningFunction,logging=False,logPath='',recurrent=False,trial_fitness_aggregation='average',custom_fitness_aggregation=None,time_step=0.05,num_trials=10,parallel=False,returnData=[],gameName='game',num_generations=300,fitness_collection_type=None):

        self.logging = logging;
        self.logPath = logPath;
        self.generations = num_generations;
        self.recurrent = recurrent;
        self.gameName = gameName;
        self.parallel = parallel;
        self.time_step = time_step;
        self.numTrials = num_trials;
        self.fitnessFromGameData = gameFitnessFunction;
        self.gameStillRunning = gameRunningFunction;
        self.fitness_collection_type = fitness_collection_type;

        self.returnData = returnData;
##        for (datum in returnData):
##            if (isinstance(datum,IOData)):
##                [returnData.append(x) for x in datum.getSplitData()];
##            else:
##                returnData.append(datum);
##        
        if (trial_fitness_aggregation == 'custom'):
            self.fitnessFromArray = custom_fitness_aggregation;

        if (trial_fitness_aggregation == 'average'):
            self.fitnessFromArray = lambda fitnesses : sum(fitnesses)/len(fitnesses);

        if (trial_fitness_aggregation == 'max'):
            self.fitnessFromArray = lambda fitnesses : max(fitnesses);

        if (trial_fitness_aggregation == 'min'):
            self.fitnessFromArray = lambda fitnesses : min(fitnesses);

gameFitnessFunction 和 gameRunningFunction 是传递给自定义训练行为的函数。

当程序尝试 运行 eval_genomes_feedforward 且 runnerConfig.parallel = True 时,我收到以下完整错误消息:

Traceback (most recent call last):
  File "c:/Users/harrison_truscott/Documents/GitHub/AI_game_router/Neat/smb1Py_runner.py", line 94, in <module>
    winner = runner.run(config,'run_' + str(currentRun));
  File "c:\Users\harrison_truscott\Documents\GitHub\AI_game_router\Neat\game_runner_neat.py", line 75, in run
    winner = pop.run(self.eval_genomes,self.runConfig.generations);
  File "c:\Users\harrison_truscott\Documents\GitHub\AI_game_router\Neat\neat\population.py", line 102, in run
    fitness_function(list(iteritems(self.population)), self.config)
  File "c:\Users\harrison_truscott\Documents\GitHub\AI_game_router\Neat\game_runner_neat.py", line 204, in eval_genomes
    self.eval_genomes_feedforward(genomes,config);
  File "c:\Users\harrison_truscott\Documents\GitHub\AI_game_router\Neat\game_runner_neat.py", line 276, in eval_genomes_feedforward
    process.start();
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\process.py", line 121, in start
    self._popen = self._Popen(self)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\context.py", line 326, in _Popen
    return Popen(process_obj)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\popen_spawn_win32.py", line 93, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
AttributeError: Can't pickle local object 'RunnerConfig.__init__.<locals>.<lambda>'

当第一个进程中断时,当下一个进程被第一个进程的不完整启动中断时,我会收到第二条错误消息:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\spawn.py", line 125, in _main
    prepare(preparation_data)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\spawn.py", line 236, in prepare
    _fixup_main_from_path(data['init_main_from_path'])
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\spawn.py", line 287, in _fixup_main_from_path
    main_content = runpy.run_path(main_path,
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\runpy.py", line 263, in run_path
    return _run_module_code(code, init_globals, run_name,
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\runpy.py", line 96, in _run_module_code
    _run_code(code, mod_globals, init_globals,
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "c:\Users\harrison_truscott\Documents\GitHub\AI_game_router\Neat\smb1Py_runner.py", line 94, in <module>
    winner = runner.run(config,'run_' + str(currentRun));
  File "c:\Users\harrison_truscott\Documents\GitHub\AI_game_router\Neat\game_runner_neat.py", line 75, in run
    winner = pop.run(self.eval_genomes,self.runConfig.generations);
  File "c:\Users\harrison_truscott\Documents\GitHub\AI_game_router\Neat\neat\population.py", line 102, in run
    fitness_function(list(iteritems(self.population)), self.config)
  File "c:\Users\harrison_truscott\Documents\GitHub\AI_game_router\Neat\game_runner_neat.py", line 204, in eval_genomes
    self.eval_genomes_feedforward(genomes,config);
  File "c:\Users\harrison_truscott\Documents\GitHub\AI_game_router\Neat\game_runner_neat.py", line 276, in eval_genomes_feedforward
    process.start();
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\process.py", line 121, in start
    self._popen = self._Popen(self)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\context.py", line 326, in _Popen
    return Popen(process_obj)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\popen_spawn_win32.py", line 45, in __init__
    prep_data = spawn.get_preparation_data(process_obj._name)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\spawn.py", line 154, in get_preparation_data
    _check_not_importing_main()
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\spawn.py", line 134, in _check_not_importing_main
    raise RuntimeError('''
RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

顺便说一句,multiprocessing.freeze_support() 是我在主文件 运行.

中调用的第一个函数

我将尝试解决主要问题。我对你的实际问题的理解非常有限,因为我不知道你的代码实际上做了什么。

“一种使用 pygame 作为不同线程中的多个不同实例的方法,从同一进程产生”

这是行不通的,因为 pygame 建立在 SDL2 之上,其中指出 “您不应期望能够创建 window、渲染、或在主线程以外的任何线程上接收事件。"

“一种安全地同时使用 pygame 的显示(和更新时钟)的方法”

同上,显示只在主线程工作。

“一种使用 multiprocessing.Process 的方法,它不需要 pickle 方法的 class 但仍然可以访问 class 变量”

您可以使用 dill 之类的方法来 pickle 方法,但是(对我来说)在进程之间复制 python 对象是错误的。我会寻求另一种解决方案。

“一个多处理库:”

1。 不需要 pickle lambda 函数或者能够

您需要使用序列化 Python 对象以便在进程之间发送它们。

2。 有办法告诉子流程正在使用哪个流程工作者

我不明白这是什么意思。


在我看来,可以通过更好地分离数据和可视化来解决问题。培训应该不了解任何可视化,因为它不依赖于您想要如何显示它。所以应该没有任何理由分享 pygame 显示。

完成后,执行您想要执行的操作应该不会有太大问题(multi-threading 总是会导致问题)。关于泡菜问题;我会尽量避免 pickling Python 对象和函数,而只是在线程和进程之间传递基本原语。看起来你应该能够用一个简单的 int 来分配 self.fitnessFromArray 而不是根据它的值在 thread/process 中进行 min/avg/max 计算。

如果你想做线程,那么主线程会负责渲染。它还会为训练生成线程。当线程完成时,它们将 return 它们的结果(或将其放入线程安全存储中)并且主线程将轮询数据并显示结果。如果训练完成的工作花费的时间超过一帧,则将工作分开,以便每个线程只进行部分训练,并可以在下一帧停止的地方继续。

如果您想要单独的进程,则原则是相同的。主进程启动几个训练进程并通过套接字连接到它们。从套接字,您可以轮询有关程序状态的信息并显示它。它基本上是一个 client-server 架构(尽管在本地主机上),其中训练脚本是服务器,主进程是客户端。