子进程中的多处理 Class

Multiprocessing Class in Subprocess

我想在 class 中使用 python 的多处理模块,它本身使用子进程来不阻塞主调用。

最小示例如下所示:

import multiprocessing as mp


class mpo():

def __init__(self):
    cpu = mp.cpu_count()
    self.Pool = mp.Pool(processes = 2)
    self.alive = True
    self.p = mp.Process(target = self.sub,args=())

def worker():
    print 'Alive'

def sub(self):
    print self.alive

    for i in range(2):
        print i
        self.Pool.apply_async(self.worker, args=())
    print 'done'
    self.Pool.close()
#   self.Pool.join()

我评论了最后一行,因为它引发了断言错误(只能加入子进程)。 当我这样做时:

m =mpo()
m.p.start()

输出为

True
0
1
done

我的主要问题是,为什么工作线程中的打印语句永远不会到达?

更新:

更新后的代码如下所示。

import multiprocessing as mp


class mpo():

    def __init__(self):
        cpu = mp.cpu_count()
        self.alive = True
        self.p = mp.Process(target = self.sub,args=())
        self.result=[]

    def worker(self):
        self.result.append(1)
        print 'Alive'

    def sub(self):
        print self.alive
        Pool = mp.Pool(processes = 2)
        for i in range(2):
            print i
            Pool.apply_async(self.worker, args=())
        print 'done'
        Pool.close()
        Pool.join()

池现在不必继承,因为它是在子进程中创建的。结果被附加到调用对象,而不是打印语句,并且池被正确连接。然而,没有结果显示。

所以我认为这可能对应于您正在寻找的一个简单示例:

import multiprocessing as mp

def worker(arg):
    #print 'Alive'+str(arg)
    return "Alive and finished {0}".format(arg)

class mpo():
    def __init__(self):
        cpu = mp.cpu_count()
        self.alive = True
        self.pool = mp.Pool(processes = 2)
    def sub(self,arguments):
        self.results=self.pool.map_async(worker, arguments)
        return self.results

if __name__=="__main__":
    s=mpo()
    s.sub(range(10))
    print s.results.get()

另外你可以调用

self.results.ready()

查明进程是否已完成其工作。您不必将它放在另一个进程中,因为 map_async 调用不会阻止程序的其余部分。

编辑: 关于您的评论,我没有真正看到将计算放在单独的进程中的价值,因为该函数已经 运行 在单独的进程中(在池中)。您只能通过将其嵌套在另一个子流程中来增加复杂性,但这是可能的:

import multiprocessing as mp
def worker(arg):
    #print 'Alive'+str(arg)
    return "Alive and finished {0}".format(arg)
class mpo():
    def __init__(self):
        cpu = mp.cpu_count()
        self.alive = True
        self.pool = mp.Pool(processes = 2)
    def sub(self,arguments):
        self.results=self.pool.map_async(worker, arguments)
        return self.results
def run_calculation(q):
    s=mpo()
    results=s.sub(range(10))
    q.put(results.get())

queue=mp.Queue()
proc=mp.Process(target=run_calculation,args=(queue,))
proc.start()
proc.join()
queue.get()