Python: 模块中的多处理 Queue.put() 不会向父进程发送任何内容
Python: multiprocessing Queue.put() in module won't send anything to parent process
我正在尝试使用 Python 中的多处理包使 2 个进程相互通信,更准确地说是 Queue() class。从父进程,我想每 5 秒获取子进程的更新值。这个子进程是一个 class 函数。我做了一个玩具示例,一切正常。
然而,当我尝试在我的项目中实现这个解决方案时,子模块中子进程的 Queue.put() 方法似乎不会向父进程发送任何内容,因为父进程不会打印所需的值,代码永远不会停止 运行。实际上,父进程只打印发送给子进程的值,这里是True
,但正如我所说,从不停止。
所以我的问题是:
我的玩具示例有什么错误吗?
我应该如何修改我的项目才能让它像我的玩具示例一样工作?
玩具示例:作品
主模块
from multiprocessing import Process, Event, Lock, Queue, Pipe
import time
import test_mod as test
def loop(output):
stop_event = Event()
q = Queue()
child_process = Process(target=test.child.sub, args=(q,))
child_process.start()
i = 0
print("started at {} ".format(time.time()))
while not stop_event.is_set():
i+=1
time.sleep(5)
q.put(True)
print(q.get())
if i == 5:
child_process.terminate()
stop_event.set()
output.put("main process looped")
if __name__ == '__main__':
stop_event, output = Event(), Queue()
k = 0
while k < 5:
loop_process = Process(target=loop, args=(output,))
loop_process.start()
print(output.get())
loop_process.join()
k+=1
子模块
from multiprocessing import Process, Event, Lock, Queue, Pipe
import time
class child(object):
def __init__(self):
pass
def sub(q):
i = 0
while i < 2000:
latest_value = time.time()
accord = q.get()
if accord == True:
q.put(latest_value)
accord = False
time.sleep(0.0000000005)
i+=1
项目代码:不起作用
主模块
import neat #package in which the submodule is
import *some other stuff*
def run(config_file):
config = neat.Config(some configuration)
p = neat.Population(config)
**WHERE MY PROBLEM IS**
stop_event = Event()
q = Queue()
pe = neat.ParallelEvaluator(**args)
child_process = Process(target=p.run, args=(pe.evaluate, q, other args))
child_process.start()
i = 0
while not stop_event.is_set():
q.put(True)
print(q.get())
time.sleep(5)
i += 1
if i == 5:
child_process.terminate()
stop_event.set()
if __name__ == '__main__':
run(config_file)
子模块
class Population(object):
def __init__():
*initialization*
def run(self, q, other args):
while n is None or k < n:
*some stuff*
accord = add_2.get()
if accord == True:
add_2.put(self.best_genome.fitness)
accord = False
return self.best_genome
注意:
我不习惯多处理
考虑到整个代码太长,我已尝试给出项目中最相关的部分。
我也考虑过使用 Pipe(),但是这个选项也不起作用。
如果我没看错的话,你想要的子模块是 class Population
。但是,您使用 ParallelEvaluator
类型的参数启动您的进程。接下来,我看不到您将 Queue q
提供给子流程。这就是我从提供的代码中看到的:
stop_event = Event()
q = Queue()
pe = neat.ParallelEvaluator(**args)
child_process = Process(target=p.run, args=(pe.evaluate, **args)
child_process.start()
此外,以下行创建了竞争条件:
q.put(True)
print(q.get())
get
命令类似于 pop
。所以它需要一个元素并将其从队列中删除。如果您的子进程不访问这两行之间的队列(因为它很忙),则 True
永远不会进入子进程。因此,最好两个使用多个队列。每个方向一个。类似于:
stop_event = Event()
q_in = Queue()
q_out = Queue()
pe = neat.ParallelEvaluator(**args)
child_process = Process(target=p.run, args=(pe.evaluate, **args))
child_process.start()
i = 0
while not stop_event.is_set():
q_in.put(True)
print(q_out.get())
time.sleep(5)
i += 1
if i == 5:
child_process.terminate()
stop_event.set()
这是您的子模块
class Population(object):
def __init__():
*initialization*
def run(self, **args):
while n is None or k < n:
*some stuff*
accord = add_2.get() # add_2 = q_in
if accord == True:
add_3.put(self.best_genome.fitness) #add_3 = q_out
accord = False
return self.best_genome
我正在尝试使用 Python 中的多处理包使 2 个进程相互通信,更准确地说是 Queue() class。从父进程,我想每 5 秒获取子进程的更新值。这个子进程是一个 class 函数。我做了一个玩具示例,一切正常。
然而,当我尝试在我的项目中实现这个解决方案时,子模块中子进程的 Queue.put() 方法似乎不会向父进程发送任何内容,因为父进程不会打印所需的值,代码永远不会停止 运行。实际上,父进程只打印发送给子进程的值,这里是True
,但正如我所说,从不停止。
所以我的问题是:
我的玩具示例有什么错误吗?
我应该如何修改我的项目才能让它像我的玩具示例一样工作?
玩具示例:作品
主模块
from multiprocessing import Process, Event, Lock, Queue, Pipe
import time
import test_mod as test
def loop(output):
stop_event = Event()
q = Queue()
child_process = Process(target=test.child.sub, args=(q,))
child_process.start()
i = 0
print("started at {} ".format(time.time()))
while not stop_event.is_set():
i+=1
time.sleep(5)
q.put(True)
print(q.get())
if i == 5:
child_process.terminate()
stop_event.set()
output.put("main process looped")
if __name__ == '__main__':
stop_event, output = Event(), Queue()
k = 0
while k < 5:
loop_process = Process(target=loop, args=(output,))
loop_process.start()
print(output.get())
loop_process.join()
k+=1
子模块
from multiprocessing import Process, Event, Lock, Queue, Pipe
import time
class child(object):
def __init__(self):
pass
def sub(q):
i = 0
while i < 2000:
latest_value = time.time()
accord = q.get()
if accord == True:
q.put(latest_value)
accord = False
time.sleep(0.0000000005)
i+=1
项目代码:不起作用
主模块
import neat #package in which the submodule is
import *some other stuff*
def run(config_file):
config = neat.Config(some configuration)
p = neat.Population(config)
**WHERE MY PROBLEM IS**
stop_event = Event()
q = Queue()
pe = neat.ParallelEvaluator(**args)
child_process = Process(target=p.run, args=(pe.evaluate, q, other args))
child_process.start()
i = 0
while not stop_event.is_set():
q.put(True)
print(q.get())
time.sleep(5)
i += 1
if i == 5:
child_process.terminate()
stop_event.set()
if __name__ == '__main__':
run(config_file)
子模块
class Population(object):
def __init__():
*initialization*
def run(self, q, other args):
while n is None or k < n:
*some stuff*
accord = add_2.get()
if accord == True:
add_2.put(self.best_genome.fitness)
accord = False
return self.best_genome
注意:
我不习惯多处理
考虑到整个代码太长,我已尝试给出项目中最相关的部分。
我也考虑过使用 Pipe(),但是这个选项也不起作用。
如果我没看错的话,你想要的子模块是 class Population
。但是,您使用 ParallelEvaluator
类型的参数启动您的进程。接下来,我看不到您将 Queue q
提供给子流程。这就是我从提供的代码中看到的:
stop_event = Event()
q = Queue()
pe = neat.ParallelEvaluator(**args)
child_process = Process(target=p.run, args=(pe.evaluate, **args)
child_process.start()
此外,以下行创建了竞争条件:
q.put(True)
print(q.get())
get
命令类似于 pop
。所以它需要一个元素并将其从队列中删除。如果您的子进程不访问这两行之间的队列(因为它很忙),则 True
永远不会进入子进程。因此,最好两个使用多个队列。每个方向一个。类似于:
stop_event = Event()
q_in = Queue()
q_out = Queue()
pe = neat.ParallelEvaluator(**args)
child_process = Process(target=p.run, args=(pe.evaluate, **args))
child_process.start()
i = 0
while not stop_event.is_set():
q_in.put(True)
print(q_out.get())
time.sleep(5)
i += 1
if i == 5:
child_process.terminate()
stop_event.set()
这是您的子模块
class Population(object):
def __init__():
*initialization*
def run(self, **args):
while n is None or k < n:
*some stuff*
accord = add_2.get() # add_2 = q_in
if accord == True:
add_3.put(self.best_genome.fitness) #add_3 = q_out
accord = False
return self.best_genome