Python 管道子进程在加入后挂起
Python piped subprocess hanging after join
希望这是一个相当容易回答的问题。我试图在我的程序执行时不断地向工作进程发送一组数据。问题是,当我尝试加入线程时,程序挂起。
我原以为 worker 可能终止了,我不需要 join
,但是当我删除对 join
的调用时,我的程序在最后挂起。
这是我的代码片段。我正试图通过以这种方式使用 worker 来规避大型酸洗操作,因为我之前 运行 遇到 Queue 对象的开销问题。
# Outside of the class definition if that matters here...
def RunTimeWorker(conn, timestep, total_timesteps):
print "Start worker", timestep, total_timesteps
while (timestep < total_timesteps):
data = conn.recv()
timestep = data[0]
print timestep, "DATA [" + str(data)
conn.close()
print "End worker"
以及调用它的 class 方法:
def Execute(self):
parent_conn, child_conn = Pipe()
p = Process(target=RunTimeTestingWorker,args=(child_conn,0,300))
p.start()
for timestep in xrange(300):
...
# Send required data over to worker
toProcessArr = [timestep,300,
# trace data
...,...]
parent_conn.send(toProcessArr)
...
p.join # program hangs here
#p.join -- program will hang at end if join is commented
这里我的时间步更新成功...
Start worker 0 300
0 DATA [[0, 300, ...]
1 DATA [[1, 300, ...]
2 DATA [[2, 300, ...]
...
299 DATA [[299, 300, ...] # max timesteps are 300
编辑
正如 David 所指出的那样,这是我的一个愚蠢错误。但是,他关于添加哨兵的评论非常有价值。
这是因为您的工作人员正在等待 timestep < total_timesteps
,其中 total_timesteps = 300
但 timestep = 299
(因为 timestep
在 xrange(300)
中,即 0.. 299).
此处更好的模式是在处理完成时发送某种标记值。例如,将工作人员更改为:
while True:
data = con.recv()
if data == "DONE":
break
然后关于制作人:
parent_conn.send("DONE")
p.join()
希望这是一个相当容易回答的问题。我试图在我的程序执行时不断地向工作进程发送一组数据。问题是,当我尝试加入线程时,程序挂起。
我原以为 worker 可能终止了,我不需要 join
,但是当我删除对 join
的调用时,我的程序在最后挂起。
这是我的代码片段。我正试图通过以这种方式使用 worker 来规避大型酸洗操作,因为我之前 运行 遇到 Queue 对象的开销问题。
# Outside of the class definition if that matters here...
def RunTimeWorker(conn, timestep, total_timesteps):
print "Start worker", timestep, total_timesteps
while (timestep < total_timesteps):
data = conn.recv()
timestep = data[0]
print timestep, "DATA [" + str(data)
conn.close()
print "End worker"
以及调用它的 class 方法:
def Execute(self):
parent_conn, child_conn = Pipe()
p = Process(target=RunTimeTestingWorker,args=(child_conn,0,300))
p.start()
for timestep in xrange(300):
...
# Send required data over to worker
toProcessArr = [timestep,300,
# trace data
...,...]
parent_conn.send(toProcessArr)
...
p.join # program hangs here
#p.join -- program will hang at end if join is commented
这里我的时间步更新成功...
Start worker 0 300
0 DATA [[0, 300, ...]
1 DATA [[1, 300, ...]
2 DATA [[2, 300, ...]
...
299 DATA [[299, 300, ...] # max timesteps are 300
编辑
正如 David 所指出的那样,这是我的一个愚蠢错误。但是,他关于添加哨兵的评论非常有价值。
这是因为您的工作人员正在等待 timestep < total_timesteps
,其中 total_timesteps = 300
但 timestep = 299
(因为 timestep
在 xrange(300)
中,即 0.. 299).
此处更好的模式是在处理完成时发送某种标记值。例如,将工作人员更改为:
while True:
data = con.recv()
if data == "DONE":
break
然后关于制作人:
parent_conn.send("DONE")
p.join()