子进程中的多处理 Class
Multiprocessing Class in Subprocess
我想在 class 中使用 python 的多处理模块,它本身使用子进程来不阻塞主调用。
最小示例如下所示:
import multiprocessing as mp
class mpo():
def __init__(self):
cpu = mp.cpu_count()
self.Pool = mp.Pool(processes = 2)
self.alive = True
self.p = mp.Process(target = self.sub,args=())
def worker():
print 'Alive'
def sub(self):
print self.alive
for i in range(2):
print i
self.Pool.apply_async(self.worker, args=())
print 'done'
self.Pool.close()
# self.Pool.join()
我评论了最后一行,因为它引发了断言错误(只能加入子进程)。
当我这样做时:
m =mpo()
m.p.start()
输出为
True
0
1
done
我的主要问题是,为什么工作线程中的打印语句永远不会到达?
更新:
更新后的代码如下所示。
import multiprocessing as mp
class mpo():
def __init__(self):
cpu = mp.cpu_count()
self.alive = True
self.p = mp.Process(target = self.sub,args=())
self.result=[]
def worker(self):
self.result.append(1)
print 'Alive'
def sub(self):
print self.alive
Pool = mp.Pool(processes = 2)
for i in range(2):
print i
Pool.apply_async(self.worker, args=())
print 'done'
Pool.close()
Pool.join()
池现在不必继承,因为它是在子进程中创建的。结果被附加到调用对象,而不是打印语句,并且池被正确连接。然而,没有结果显示。
所以我认为这可能对应于您正在寻找的一个简单示例:
import multiprocessing as mp
def worker(arg):
#print 'Alive'+str(arg)
return "Alive and finished {0}".format(arg)
class mpo():
def __init__(self):
cpu = mp.cpu_count()
self.alive = True
self.pool = mp.Pool(processes = 2)
def sub(self,arguments):
self.results=self.pool.map_async(worker, arguments)
return self.results
if __name__=="__main__":
s=mpo()
s.sub(range(10))
print s.results.get()
另外你可以调用
self.results.ready()
查明进程是否已完成其工作。您不必将它放在另一个进程中,因为 map_async 调用不会阻止程序的其余部分。
编辑:
关于您的评论,我没有真正看到将计算放在单独的进程中的价值,因为该函数已经 运行 在单独的进程中(在池中)。您只能通过将其嵌套在另一个子流程中来增加复杂性,但这是可能的:
import multiprocessing as mp
def worker(arg):
#print 'Alive'+str(arg)
return "Alive and finished {0}".format(arg)
class mpo():
def __init__(self):
cpu = mp.cpu_count()
self.alive = True
self.pool = mp.Pool(processes = 2)
def sub(self,arguments):
self.results=self.pool.map_async(worker, arguments)
return self.results
def run_calculation(q):
s=mpo()
results=s.sub(range(10))
q.put(results.get())
queue=mp.Queue()
proc=mp.Process(target=run_calculation,args=(queue,))
proc.start()
proc.join()
queue.get()
我想在 class 中使用 python 的多处理模块,它本身使用子进程来不阻塞主调用。
最小示例如下所示:
import multiprocessing as mp
class mpo():
def __init__(self):
cpu = mp.cpu_count()
self.Pool = mp.Pool(processes = 2)
self.alive = True
self.p = mp.Process(target = self.sub,args=())
def worker():
print 'Alive'
def sub(self):
print self.alive
for i in range(2):
print i
self.Pool.apply_async(self.worker, args=())
print 'done'
self.Pool.close()
# self.Pool.join()
我评论了最后一行,因为它引发了断言错误(只能加入子进程)。 当我这样做时:
m =mpo()
m.p.start()
输出为
True
0
1
done
我的主要问题是,为什么工作线程中的打印语句永远不会到达?
更新:
更新后的代码如下所示。
import multiprocessing as mp
class mpo():
def __init__(self):
cpu = mp.cpu_count()
self.alive = True
self.p = mp.Process(target = self.sub,args=())
self.result=[]
def worker(self):
self.result.append(1)
print 'Alive'
def sub(self):
print self.alive
Pool = mp.Pool(processes = 2)
for i in range(2):
print i
Pool.apply_async(self.worker, args=())
print 'done'
Pool.close()
Pool.join()
池现在不必继承,因为它是在子进程中创建的。结果被附加到调用对象,而不是打印语句,并且池被正确连接。然而,没有结果显示。
所以我认为这可能对应于您正在寻找的一个简单示例:
import multiprocessing as mp
def worker(arg):
#print 'Alive'+str(arg)
return "Alive and finished {0}".format(arg)
class mpo():
def __init__(self):
cpu = mp.cpu_count()
self.alive = True
self.pool = mp.Pool(processes = 2)
def sub(self,arguments):
self.results=self.pool.map_async(worker, arguments)
return self.results
if __name__=="__main__":
s=mpo()
s.sub(range(10))
print s.results.get()
另外你可以调用
self.results.ready()
查明进程是否已完成其工作。您不必将它放在另一个进程中,因为 map_async 调用不会阻止程序的其余部分。
编辑: 关于您的评论,我没有真正看到将计算放在单独的进程中的价值,因为该函数已经 运行 在单独的进程中(在池中)。您只能通过将其嵌套在另一个子流程中来增加复杂性,但这是可能的:
import multiprocessing as mp
def worker(arg):
#print 'Alive'+str(arg)
return "Alive and finished {0}".format(arg)
class mpo():
def __init__(self):
cpu = mp.cpu_count()
self.alive = True
self.pool = mp.Pool(processes = 2)
def sub(self,arguments):
self.results=self.pool.map_async(worker, arguments)
return self.results
def run_calculation(q):
s=mpo()
results=s.sub(range(10))
q.put(results.get())
queue=mp.Queue()
proc=mp.Process(target=run_calculation,args=(queue,))
proc.start()
proc.join()
queue.get()