Python 对象中的多处理

Python multiprocessing in objects

我正在编写一个程序,其中可变数量的 Agent 对象同时 运行 多个串行方法并将它们的 return 值存储在队列属性中。每个 Agent 都有一个单独的 Worker(Process 的子类)作为属性,并通过 cmd_queue 将其作业连续提供给 运行。 Agent 在 res_queue 中从其 Worker 获取结果。这些当前是 Manager().Queue() 实例并导致: TypeError: Pickling an AuthenticationString object is disallowed for security reasons 但是,如果我使用常规 Queue.Queue,Worker 会得到 Agent 的 cmd_queue 的副本,并且看不到 Agent 添加到其中的内容(它始终为空)。

我可以使用这个问题中引用的解决方案来 pickle 实例方法:Can't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map()

from multiprocessing import Manager, Process
from time import sleep
import copy_reg  

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method  

class Worker(Process):
    def __init__(self, cmd_queue, res_queue):
        self.cmd_queue = cmd_queue
        self.res_queue = res_queue
        Process.__init__(self)

    def run(self):
        while True:
            f, args, kwargs = self.cmd_queue.get()
            self.res_queue.put( f(*args, **kwargs) )  

class Agent:
    def __init__(self):
        self.cmd_queue = Manager().Queue()
        self.res_queue = Manager().Queue()
        self.worker = Worker(self.cmd_queue, self.res_queue)
        self.worker.start()

    def produce(self, f, *args, **kwargs):
        self.cmd_queue.put((f, args, kwargs))

    def do_some_work(self):
        self.produce(self.foo, waka='waka')

    def do_some_other_work(self):
        self.produce(self.bar, humana='humana')

    def foo(self, **kwargs):
        sleep(5)
        return('this is a foo')

    def bar(self, **kwargs):
        sleep(10)
        return('this is a bar')

    def get_results(self):  #blocking call
        res = []
        while not self.cmd_queue.empty():#wait for Worker to finish
            sleep(.5)
        while not self.res_queue.empty():
            res.append(self.res_queue.get())
        return res  

#This is the interface I'm looking for.
if __name__=='__main__':
    agents = [Agent() for i in range(50)]
    #this should flow quickly as the calls are added to cmd_queues
    for agent in agents:        
        agent.do_some_work()
        agent.do_some_other_work()  
    for agent in agents:
        print(agent.get_results())

我的问题是,如何使用多处理让这段代码工作,或者是否有更好、更被接受的方法让这种模式工作?这是更大框架的一小部分,所以我希望它尽可能面向对象友好。

编辑:这是在 python 2.7.

如果 multiprocessing 的一个非常温和的分支可以让这个模式起作用,你会满意吗?如果是这样,您只需在问题中提到的 link 中进一步向下看: Can't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map().

因为 pathos.multiprocessing 有一个 Pool 可以以非常干净的方式 pickle 实例方法,你可以像串行编码一样工作 python… 和 它只是工作…甚至直接来自解释器。

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> from Queue import Queue
>>> from time import sleep
>>> 
>>> class Agent:
...   def __init__(self):
...     self.pool = Pool()
...     self.queue = Queue()
...   def produce(self, f, *args, **kwds):
...     self.queue.put(self.pool.apipe(f, *args, **kwds))
...   def do_some_work(self):
...     self.produce(self.foo, waka='waka')
...   def do_some_other_work(self):
...     self.produce(self.bar, humana='humana')
...   def foo(self, **kwds):
...     sleep(5)
...     return 'this is a foo'
...   def bar(self, **kwds):
...     sleep(10) 
...     return 'this is a bar'
...   def get_results(self):
...     res = []
...     while not self.queue.empty():
...       res.append(self.queue.get().get())
...     return res
... 
>>> agents = [Agent() for i in range(50)]
>>> for agent in agents:
...   agent.do_some_work()
...   agent.do_some_other_work()
... 
>>> for agent in agents:
...   print(agent.get_results())
... 
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
>>> 

在此处获取 pathoshttps://github.com/uqfoundation

您可以使用普通 multiprocessing.Queue 来完成此操作。您只需要调整 Agent class 以便在 Agent class 本身被腌制时它不会尝试腌制 Queue 实例。这是必需的,因为当您 pickle 发送给 Worker 的实例方法时,您必须 pickle Agent 实例本身。不过,这样做很容易:

class Agent(object): # Agent is now a new-style class
    def __init__(self):
        self.cmd_queue = Queue()
        self.res_queue = Queue()
        self.worker = Worker(self.cmd_queue, self.res_queue)
        self.worker.start()

    def __getstate__(self):
        """ This is called to pickle the instance """
        self_dict = self.__dict__.copy()
        del self_dict['cmd_queue']
        del self_dict['res_queue']
        del self_dict['worker']
        return self_dict

    def __setstate__(self, self_dict):
        """ This is called to unpickle the instance. """
        self.__dict__ = self_dict

    ... # The rest is the same.

请注意,此代码中还有一些其他逻辑问题使其无法正常 运行; get_results 并没有真正按照您的预期去做,因为这很容易受到竞争条件的影响:

    while not self.cmd_queue.empty():#wait for Worker to finish
        sleep(.5)
    while not self.res_queue.empty():
        res.append(self.res_queue.get())

cmd_queue 可能(并且确实如此,使用您的示例代码)在您实际传递给它的函数在 Worker 中完成 运行 之前最终为空,这将意味着当您从 res_queue 中提取所有内容时,您的某些结果将会丢失。您可以使用 JoinableQueue 来解决这个问题,它允许工作人员在完成时发出实际信号。

您还应该向工作进程发送哨兵,以便它们正确关闭,并且它们的所有结果都从 res_queue 中刷新并正确发送回父进程。我还发现我需要向 res_queue 添加一个哨兵,否则有时 res_queue 会在子进程写入的最后一个结果实际通过管道刷新之前在父进程中显示为空,这意味着最后的结果会丢失。

这是一个完整的工作示例:

from multiprocessing import Process, Queue, JoinableQueue
import types
from time import sleep
import copy_reg  

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)

class Worker(Process):
    def __init__(self, cmd_queue, res_queue):
        self.cmd_queue = cmd_queue
        self.res_queue = res_queue
        Process.__init__(self)

    def run(self):
        for f, args, kwargs in iter(self.cmd_queue.get, 
                                    (None, (), {})): # None is our sentinel
            self.res_queue.put( f(*args, **kwargs) )  
            self.cmd_queue.task_done() # Mark the task as done.
        self.res_queue.put(None) # Send this to indicate no more results are coming
        self.cmd_queue.task_done() # Mark the task as done

class Agent(object):
    def __init__(self):
        self.cmd_queue = JoinableQueue()
        self.res_queue = Queue()
        self.worker = Worker(self.cmd_queue, self.res_queue)
        self.worker.start()

    def __getstate__(self):
        self_dict = self.__dict__.copy()
        del self_dict['cmd_queue']
        del self_dict['res_queue']
        del self_dict['worker']
        return self_dict

    def __setstate__(self, self_dict):
        self.__dict__ = self_dict

    def produce(self, f, *args, **kwargs):
        self.cmd_queue.put((f, args, kwargs))

    def do_some_work(self):
        self.produce(self.foo, waka='waka')

    def do_some_other_work(self):
        self.produce(self.bar, humana='humana')

    def send_sentinel(self):
        self.produce(None)

    def foo(self, **kwargs):
        sleep(2)
        return('this is a foo')

    def bar(self, **kwargs):
        sleep(4)
        return('this is a bar')

    def get_results(self):  #blocking call
        res = []
        self.cmd_queue.join() # This will block until task_done has been called for every put pushed into the queue.
        for out in iter(self.res_queue.get, None):  # None is our sentinel
            res.append(out)
        return res  

#This is the interface I'm looking for.
if __name__=='__main__':
    agents = [Agent() for i in range(50)]
    #this should flow quickly as the calls are added to cmd_queues
    for agent in agents:        
        agent.do_some_work()
        agent.do_some_other_work()  
        agent.send_sentinel()
    for agent in agents:
        print(agent.get_results())

输出:

['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']