Python multiprocessing.pool 与 class objective 功能和神经进化的相互作用
Python multiprocessing.pool's interaction with a class objective function and neuro-evolution
警告,这会很长,因为我想尽可能具体。
确切问题:这是一个多处理问题。我已经确保我的 classes 在之前的实验中都表现得像 built/expected。
编辑:事先说过穿线。
当我 运行 在线程环境中测试我的问题示例时,一切正常;但是,当我过渡到真正的问题时,代码就会中断。具体来说,我收到 TypeError: can't pickle _thread.lock objects
错误。完整堆栈在底部。
我这里的线程需求与我改编代码的示例有点不同 -- https://github.com/CMA-ES/pycma/issues/31。在这个例子中,我们有一个适应度函数,可以被每个评估独立调用,none 个函数调用可以相互交互。然而,在我的实际问题中,我们正在尝试使用遗传算法优化神经网络权重。 GA 将建议潜在的权重,我们需要在我们的环境中评估这些 NN 控制器权重。在单线程情况下,我们可以只有一个环境,在该环境中我们使用简单的 for 循环评估权重:[nn.evaluate(weights) for weights in potential_candidates]
,找到表现最好的个体,并在下一轮变异中使用这些权重。但是,我们不能简单地在线程环境中进行一次模拟。
因此,我没有传递单个函数来进行评估,而是传递了一个函数列表(每个人一个,环境相同,但我们已经分叉了进程,以便通信流不个人之间没有互动。)
还有一件事需要立即注意:
我正在使用来自 neat
的并行评估数据结构
from neat.parallel import ParallelEvaluator # 使用 multiprocessing.Pool
玩具示例代码:
NPARAMS = nn.flat_init_weights.shape[0] # make this a 1000-dimensional problem.
NPOPULATION = 5 # use population size of 5.
MAX_ITERATION = 100 # run each solver for 100 function calls.
import time
from neat.parallel import ParallelEvaluator # uses multiprocessing.Pool
import cma
def fitness(x):
time.sleep(0.1)
return sum(x**2)
# # serial evaluation of all solutions
# def serial_evals(X, f=fitness, args=()):
# return [f(x, *args) for x in X]
# parallel evaluation of all solutions
def _evaluate2(self, weights, *args):
"""redefine evaluate without the dependencies on neat-internal data structures
"""
jobs = []
for i, w in enumerate(weights):
jobs.append(self.pool.apply_async(self.eval_function[i], (w, ) + args))
return [job.get() for job in jobs]
ParallelEvaluator.evaluate2 = _evaluate2
parallel_eval = ParallelEvaluator(12, [fitness]*NPOPULATION)
# time both
for eval_all in [parallel_eval.evaluate2]:
es = cma.CMAEvolutionStrategy(NPARAMS * [1], 1, {'maxiter': MAX_ITERATION,
'popsize': NPOPULATION})
es.disp_annotation()
while not es.stop():
X = es.ask()
es.tell(X, eval_all(X))
es.disp()
所需背景:
当我从玩具示例切换到真实代码时,上面的代码失败了。
我的class是:
LevelGenerator (simple GA class that implements mutate, etc)
GridGame (OpenAI wrapper; launches a Java server in which to run the simulation;
handles all communication between the Agent and the environment)
Agent (neural-network class, has an evaluate fn which uses the NN to play a single rollout)
Objective (handles serializing/de-serializing weights: numpy <--> torch; launching the evaluate function)
# The classes get composed to get the necessary behavior:
env = GridGame(Generator)
agent = NNAgent(env) # NNAgent is a subclass of (Random) Agent)
obj = PyTorchObjective(agent)
# My code normally all interacts like this in the single-threaded case:
def test_solver(solver): # Solver: CMA-ES, Differential Evolution, EvolutionStrategy, etc
history = []
for j in range(MAX_ITERATION):
solutions = solver.ask() #2d-numpy array. (POPSIZE x NPARAMS)
fitness_list = np.zeros(solver.popsize)
for i in range(solver.popsize):
fitness_list[i] = obj.function(solutions[i], len(solutions[i]))
solver.tell(fitness_list)
result = solver.result() # first element is the best solution, second element is the best fitness
history.append(result[1])
scores[j] = fitness_list
return history, result
所以,当我尝试 运行 时:
NPARAMS = nn.flat_init_weights.shape[0]
NPOPULATION = 5
MAX_ITERATION = 100
_x = NNAgent(GridGame(Generator))
gyms = [_x.mutate(0.0) for _ in range(NPOPULATION)]
objs = [PyTorchObjective(a) for a in gyms]
def evaluate(objective, weights):
return objective.fun(weights, len(weights))
import time
from neat.parallel import ParallelEvaluator # uses multiprocessing.Pool
import cma
def fitness(agent):
return agent.evalute()
# # serial evaluation of all solutions
# def serial_evals(X, f=fitness, args=()):
# return [f(x, *args) for x in X]
# parallel evaluation of all solutions
def _evaluate2(self, X, *args):
"""redefine evaluate without the dependencies on neat-internal data structures
"""
jobs = []
for i, x in enumerate(X):
jobs.append(self.pool.apply_async(self.eval_function[i], (x, ) + args))
return [job.get() for job in jobs]
ParallelEvaluator.evaluate2 = _evaluate2
parallel_eval = ParallelEvaluator(12, [obj.fun for obj in objs])
# obj.fun takes in the candidate weights, loads them into the NN, and then evaluates the NN in the environment.
# time both
for eval_all in [parallel_eval.evaluate2]:
es = cma.CMAEvolutionStrategy(NPARAMS * [1], 1, {'maxiter': MAX_ITERATION,
'popsize': NPOPULATION})
es.disp_annotation()
while not es.stop():
X = es.ask()
es.tell(X, eval_all(X, NPARAMS))
es.disp()
我收到以下错误:
TypeError Traceback (most recent call last)
<ipython-input-57-3e6b7bf6f83a> in <module>
6 while not es.stop():
7 X = es.ask()
----> 8 es.tell(X, eval_all(X, NPARAMS))
9 es.disp()
<ipython-input-55-2182743d6306> in _evaluate2(self, X, *args)
14 jobs.append(self.pool.apply_async(self.eval_function[i], (x, ) + args))
15
---> 16 return [job.get() for job in jobs]
<ipython-input-55-2182743d6306> in <listcomp>(.0)
14 jobs.append(self.pool.apply_async(self.eval_function[i], (x, ) + args))
15
---> 16 return [job.get() for job in jobs]
~/miniconda3/envs/thesis/lib/python3.7/multiprocessing/pool.py in get(self, timeout)
655 return self._value
656 else:
--> 657 raise self._value
658
659 def _set(self, i, obj):
~/miniconda3/envs/thesis/lib/python3.7/multiprocessing/pool.py in _handle_tasks(taskqueue, put, outqueue, pool, cache)
429 break
430 try:
--> 431 put(task)
432 except Exception as e:
433 job, idx = task[:2]
~/miniconda3/envs/thesis/lib/python3.7/multiprocessing/connection.py in send(self, obj)
204 self._check_closed()
205 self._check_writable()
--> 206 self._send_bytes(_ForkingPickler.dumps(obj))
207
208 def recv_bytes(self, maxlength=None):
~/miniconda3/envs/thesis/lib/python3.7/multiprocessing/reduction.py in dumps(cls, obj, protocol)
49 def dumps(cls, obj, protocol=None):
50 buf = io.BytesIO()
---> 51 cls(buf, protocol).dump(obj)
52 return buf.getbuffer()
53
TypeError: can't pickle _thread.lock objects
我还在这里读到,这可能是由于这是一个 class 函数 -- -- 所以我创建了全局范围的适应度函数 def fitness(agent): return agent.evalute()
,但这也没有用。
我认为这个错误可能是因为最初,我在 PyTorchObjective class 中将评估函数作为 lambda 函数,但是当我更改它时它仍然失败。
任何见解都将不胜感激,并感谢您阅读这面巨大的文字墙。
您没有使用多线程。 You are using multiple processes.
您传递给 apply_async
的所有参数,包括函数本身,都在后台进行序列化(腌制)并通过 IPC 通道传递给工作进程(阅读 multiprocessing
documentation 了解详细信息).因此,您不能传递任何与本质上是过程本地的事物相关联的实体。这包括大多数同步原语,因为它们必须使用锁来执行原子操作。
每当这种情况发生时 (as many other questions on this error message show),您可能太聪明了,将一个已经内置了并行化逻辑的对象传递给并行化框架。
如果你想用这样的 "parallelized object" 创建 "multiple levels of parallelization",你最好选择:
- 正确使用该对象的并行化机制,而不用担心多个级别:你一次做的事情不能超过你拥有的内核;或
- 在工作进程中创建和使用这些 "parallelized objects"
- 但您可能会在这里遇到
multiprocessing
限制,因为它的工作进程被故意禁止产生自己的池。
- 所以对于这种情况,a more advanced 3rd-party distributed work queue solution 可能更可取。
警告,这会很长,因为我想尽可能具体。
确切问题:这是一个多处理问题。我已经确保我的 classes 在之前的实验中都表现得像 built/expected。
编辑:事先说过穿线。
当我 运行 在线程环境中测试我的问题示例时,一切正常;但是,当我过渡到真正的问题时,代码就会中断。具体来说,我收到 TypeError: can't pickle _thread.lock objects
错误。完整堆栈在底部。
我这里的线程需求与我改编代码的示例有点不同 -- https://github.com/CMA-ES/pycma/issues/31。在这个例子中,我们有一个适应度函数,可以被每个评估独立调用,none 个函数调用可以相互交互。然而,在我的实际问题中,我们正在尝试使用遗传算法优化神经网络权重。 GA 将建议潜在的权重,我们需要在我们的环境中评估这些 NN 控制器权重。在单线程情况下,我们可以只有一个环境,在该环境中我们使用简单的 for 循环评估权重:[nn.evaluate(weights) for weights in potential_candidates]
,找到表现最好的个体,并在下一轮变异中使用这些权重。但是,我们不能简单地在线程环境中进行一次模拟。
因此,我没有传递单个函数来进行评估,而是传递了一个函数列表(每个人一个,环境相同,但我们已经分叉了进程,以便通信流不个人之间没有互动。)
还有一件事需要立即注意: 我正在使用来自 neat
的并行评估数据结构from neat.parallel import ParallelEvaluator # 使用 multiprocessing.Pool
玩具示例代码:
NPARAMS = nn.flat_init_weights.shape[0] # make this a 1000-dimensional problem.
NPOPULATION = 5 # use population size of 5.
MAX_ITERATION = 100 # run each solver for 100 function calls.
import time
from neat.parallel import ParallelEvaluator # uses multiprocessing.Pool
import cma
def fitness(x):
time.sleep(0.1)
return sum(x**2)
# # serial evaluation of all solutions
# def serial_evals(X, f=fitness, args=()):
# return [f(x, *args) for x in X]
# parallel evaluation of all solutions
def _evaluate2(self, weights, *args):
"""redefine evaluate without the dependencies on neat-internal data structures
"""
jobs = []
for i, w in enumerate(weights):
jobs.append(self.pool.apply_async(self.eval_function[i], (w, ) + args))
return [job.get() for job in jobs]
ParallelEvaluator.evaluate2 = _evaluate2
parallel_eval = ParallelEvaluator(12, [fitness]*NPOPULATION)
# time both
for eval_all in [parallel_eval.evaluate2]:
es = cma.CMAEvolutionStrategy(NPARAMS * [1], 1, {'maxiter': MAX_ITERATION,
'popsize': NPOPULATION})
es.disp_annotation()
while not es.stop():
X = es.ask()
es.tell(X, eval_all(X))
es.disp()
所需背景:
当我从玩具示例切换到真实代码时,上面的代码失败了。
我的class是:
LevelGenerator (simple GA class that implements mutate, etc)
GridGame (OpenAI wrapper; launches a Java server in which to run the simulation;
handles all communication between the Agent and the environment)
Agent (neural-network class, has an evaluate fn which uses the NN to play a single rollout)
Objective (handles serializing/de-serializing weights: numpy <--> torch; launching the evaluate function)
# The classes get composed to get the necessary behavior:
env = GridGame(Generator)
agent = NNAgent(env) # NNAgent is a subclass of (Random) Agent)
obj = PyTorchObjective(agent)
# My code normally all interacts like this in the single-threaded case:
def test_solver(solver): # Solver: CMA-ES, Differential Evolution, EvolutionStrategy, etc
history = []
for j in range(MAX_ITERATION):
solutions = solver.ask() #2d-numpy array. (POPSIZE x NPARAMS)
fitness_list = np.zeros(solver.popsize)
for i in range(solver.popsize):
fitness_list[i] = obj.function(solutions[i], len(solutions[i]))
solver.tell(fitness_list)
result = solver.result() # first element is the best solution, second element is the best fitness
history.append(result[1])
scores[j] = fitness_list
return history, result
所以,当我尝试 运行 时:
NPARAMS = nn.flat_init_weights.shape[0]
NPOPULATION = 5
MAX_ITERATION = 100
_x = NNAgent(GridGame(Generator))
gyms = [_x.mutate(0.0) for _ in range(NPOPULATION)]
objs = [PyTorchObjective(a) for a in gyms]
def evaluate(objective, weights):
return objective.fun(weights, len(weights))
import time
from neat.parallel import ParallelEvaluator # uses multiprocessing.Pool
import cma
def fitness(agent):
return agent.evalute()
# # serial evaluation of all solutions
# def serial_evals(X, f=fitness, args=()):
# return [f(x, *args) for x in X]
# parallel evaluation of all solutions
def _evaluate2(self, X, *args):
"""redefine evaluate without the dependencies on neat-internal data structures
"""
jobs = []
for i, x in enumerate(X):
jobs.append(self.pool.apply_async(self.eval_function[i], (x, ) + args))
return [job.get() for job in jobs]
ParallelEvaluator.evaluate2 = _evaluate2
parallel_eval = ParallelEvaluator(12, [obj.fun for obj in objs])
# obj.fun takes in the candidate weights, loads them into the NN, and then evaluates the NN in the environment.
# time both
for eval_all in [parallel_eval.evaluate2]:
es = cma.CMAEvolutionStrategy(NPARAMS * [1], 1, {'maxiter': MAX_ITERATION,
'popsize': NPOPULATION})
es.disp_annotation()
while not es.stop():
X = es.ask()
es.tell(X, eval_all(X, NPARAMS))
es.disp()
我收到以下错误:
TypeError Traceback (most recent call last)
<ipython-input-57-3e6b7bf6f83a> in <module>
6 while not es.stop():
7 X = es.ask()
----> 8 es.tell(X, eval_all(X, NPARAMS))
9 es.disp()
<ipython-input-55-2182743d6306> in _evaluate2(self, X, *args)
14 jobs.append(self.pool.apply_async(self.eval_function[i], (x, ) + args))
15
---> 16 return [job.get() for job in jobs]
<ipython-input-55-2182743d6306> in <listcomp>(.0)
14 jobs.append(self.pool.apply_async(self.eval_function[i], (x, ) + args))
15
---> 16 return [job.get() for job in jobs]
~/miniconda3/envs/thesis/lib/python3.7/multiprocessing/pool.py in get(self, timeout)
655 return self._value
656 else:
--> 657 raise self._value
658
659 def _set(self, i, obj):
~/miniconda3/envs/thesis/lib/python3.7/multiprocessing/pool.py in _handle_tasks(taskqueue, put, outqueue, pool, cache)
429 break
430 try:
--> 431 put(task)
432 except Exception as e:
433 job, idx = task[:2]
~/miniconda3/envs/thesis/lib/python3.7/multiprocessing/connection.py in send(self, obj)
204 self._check_closed()
205 self._check_writable()
--> 206 self._send_bytes(_ForkingPickler.dumps(obj))
207
208 def recv_bytes(self, maxlength=None):
~/miniconda3/envs/thesis/lib/python3.7/multiprocessing/reduction.py in dumps(cls, obj, protocol)
49 def dumps(cls, obj, protocol=None):
50 buf = io.BytesIO()
---> 51 cls(buf, protocol).dump(obj)
52 return buf.getbuffer()
53
TypeError: can't pickle _thread.lock objects
我还在这里读到,这可能是由于这是一个 class 函数 -- def fitness(agent): return agent.evalute()
,但这也没有用。
我认为这个错误可能是因为最初,我在 PyTorchObjective class 中将评估函数作为 lambda 函数,但是当我更改它时它仍然失败。
任何见解都将不胜感激,并感谢您阅读这面巨大的文字墙。
您没有使用多线程。 You are using multiple processes.
您传递给 apply_async
的所有参数,包括函数本身,都在后台进行序列化(腌制)并通过 IPC 通道传递给工作进程(阅读 multiprocessing
documentation 了解详细信息).因此,您不能传递任何与本质上是过程本地的事物相关联的实体。这包括大多数同步原语,因为它们必须使用锁来执行原子操作。
每当这种情况发生时 (as many other questions on this error message show),您可能太聪明了,将一个已经内置了并行化逻辑的对象传递给并行化框架。
如果你想用这样的 "parallelized object" 创建 "multiple levels of parallelization",你最好选择:
- 正确使用该对象的并行化机制,而不用担心多个级别:你一次做的事情不能超过你拥有的内核;或
- 在工作进程中创建和使用这些 "parallelized objects"
- 但您可能会在这里遇到
multiprocessing
限制,因为它的工作进程被故意禁止产生自己的池。 - 所以对于这种情况,a more advanced 3rd-party distributed work queue solution 可能更可取。
- 但您可能会在这里遇到