可以将 python 多处理队列传递给子进程吗?
Can a python Multiprocessing queue be passed to the child process?
我在 python 中编写的数据采集系统中有一个大数据集,它需要无限长的时间才能将队列从子进程传递到父进程。我想保存在采集结束时采集的数据,并尝试使用 Multiprocessing
中的 queue
函数。如果我可以通过 queue
将消息从父级传递给子级以在我终止子进程之前保存我的数据,而不是这样做,我会更喜欢它。这可能吗?我认为它可能看起来像的一个例子是:
def acquireData(self, var1, queue):
import h5py
# Put my acquisition code here
queue.get()
if queue == True:
f = h5py.File("FileName","w")
f.create_dataset('Data',data=data)
f.close()
if __name__ == '__main__':
from multiprocessing import Process, Queue
queue = Queue()
inter_thread = Process(target=acquireData, args=(var1,queue))
queue.put(False)
inter_thread.start()
while True:
if not args.automate:
# Let c++ threads run for given amount of time
# Wait for stop from OP GUI
else:
queue.put(True)
break
print("Acquisition finished, cleaning up...")
sleep(2)
inter_thread.terminate()
这是允许的吗?如果允许进程之间的这种类型的接口,那么我有正确的表示法吗?对于一些参考,我在我试图保存的数组中有 9e7 个数据点的顺序,我有 7 个数组,通过将这些数组放入 queue
.谢谢。
首先,是的,将队列传递给 child 不仅合法,而且是队列的主要用例。请参阅 the first example in the docs,正是这样做的。
但是,您的代码存在一些问题:
queue.get()
if queue == True:
首先,您的 queue
永远不会是布尔值 True
,而是 Queue
。您几乎不想在 Python 中检查 if x == True:
;你想检查 if x:
。例如,if [1, 2]:
会通过,而 if [1, 2] == True:
不会。
其次,您的 queue
甚至不是您首先要检查的内容。它不是真实的或虚假的(或者它是否是不相关的);它是主进程放在队列中的值,你退出了它,要么是真的,要么是假的。你一拿到它就把它扔掉了。
所以,这样做:
flag = queue.get()
if flag:
或者,更简单地说:
if queue.get():
我不确定这是否正是您想要的。 queue.get()
将永远阻塞,直到主进程将某些东西放在那里。那是你想要的吗?如果是这样,那太好了;您已经完成了这部分代码。如果没有,你需要想想你想要什么。
按照设计,parent 将始终等待 2 秒,即使 child 在此之前很久就已完成。更好的解决方案是 join
与 child 超时 2 秒。然后你可以terminate
它如果超时。
另外,你确定你设计的终止行为是你想要的吗?您正在对队列执行 "soft kill request",然后等待 2 秒,然后对 terminate
执行 "medium-hard kill request",并且永远不会对 kill
执行 "hard kill"。这可能是一个完全合理的设计——但如果这不是你的设计,那你就实施了错误的东西。
我在 python 中编写的数据采集系统中有一个大数据集,它需要无限长的时间才能将队列从子进程传递到父进程。我想保存在采集结束时采集的数据,并尝试使用 Multiprocessing
中的 queue
函数。如果我可以通过 queue
将消息从父级传递给子级以在我终止子进程之前保存我的数据,而不是这样做,我会更喜欢它。这可能吗?我认为它可能看起来像的一个例子是:
def acquireData(self, var1, queue):
import h5py
# Put my acquisition code here
queue.get()
if queue == True:
f = h5py.File("FileName","w")
f.create_dataset('Data',data=data)
f.close()
if __name__ == '__main__':
from multiprocessing import Process, Queue
queue = Queue()
inter_thread = Process(target=acquireData, args=(var1,queue))
queue.put(False)
inter_thread.start()
while True:
if not args.automate:
# Let c++ threads run for given amount of time
# Wait for stop from OP GUI
else:
queue.put(True)
break
print("Acquisition finished, cleaning up...")
sleep(2)
inter_thread.terminate()
这是允许的吗?如果允许进程之间的这种类型的接口,那么我有正确的表示法吗?对于一些参考,我在我试图保存的数组中有 9e7 个数据点的顺序,我有 7 个数组,通过将这些数组放入 queue
.谢谢。
首先,是的,将队列传递给 child 不仅合法,而且是队列的主要用例。请参阅 the first example in the docs,正是这样做的。
但是,您的代码存在一些问题:
queue.get()
if queue == True:
首先,您的 queue
永远不会是布尔值 True
,而是 Queue
。您几乎不想在 Python 中检查 if x == True:
;你想检查 if x:
。例如,if [1, 2]:
会通过,而 if [1, 2] == True:
不会。
其次,您的 queue
甚至不是您首先要检查的内容。它不是真实的或虚假的(或者它是否是不相关的);它是主进程放在队列中的值,你退出了它,要么是真的,要么是假的。你一拿到它就把它扔掉了。
所以,这样做:
flag = queue.get()
if flag:
或者,更简单地说:
if queue.get():
我不确定这是否正是您想要的。 queue.get()
将永远阻塞,直到主进程将某些东西放在那里。那是你想要的吗?如果是这样,那太好了;您已经完成了这部分代码。如果没有,你需要想想你想要什么。
按照设计,parent 将始终等待 2 秒,即使 child 在此之前很久就已完成。更好的解决方案是 join
与 child 超时 2 秒。然后你可以terminate
它如果超时。
另外,你确定你设计的终止行为是你想要的吗?您正在对队列执行 "soft kill request",然后等待 2 秒,然后对 terminate
执行 "medium-hard kill request",并且永远不会对 kill
执行 "hard kill"。这可能是一个完全合理的设计——但如果这不是你的设计,那你就实施了错误的东西。