如何在进程之间共享 class?

How can I share a class between processes?

我想要一个由所有进程共享和更新的全局对象,锁定最少。

import multiprocessing

class Counter(object):
  def __init__(self):
    self.value = 0

  def update(self, value):
    self.value += value


def update(counter_proxy, thread_id):
  counter_proxy.value.update(1)
  print counter_proxy.value.value, 't%s' % thread_id, \
    multiprocessing.current_process().name
  return counter_proxy.value.value

def main():
  manager = multiprocessing.Manager()
  counter = manager.Value(Counter, Counter())
  pool = multiprocessing.Pool(multiprocessing.cpu_count())
  for i in range(10):
    pool.apply(func = update, args = (counter, i))
  pool.close()
  pool.join()

  print 'Should be 10 but is %s.' % counter.value.value

if __name__ == '__main__':
  main()

结果是这样的——不是 10,而是零。看起来对象的共享值没有更新。我怎样才能锁定和更新这样的值?

0 t0 PoolWorker-2
0 t1 PoolWorker-3
0 t2 PoolWorker-5
0 t3 PoolWorker-8
0 t4 PoolWorker-9
0 t5 PoolWorker-2
0 t6 PoolWorker-7
0 t7 PoolWorker-4
0 t8 PoolWorker-6
0 t9 PoolWorker-3
Should be 10 but is 0.

目前@dano 的最佳解决方案 - 我将自定义管理器与 class 代理混合使用。

import multiprocessing
from multiprocessing.managers import BaseManager, NamespaceProxy


class Counter(object):
  def __init__(self):
    self.value = 0

  def update(self, value):
    self.value += value


def update(counter_proxy, thread_id):
  counter_proxy.update(1)

class CounterManager(BaseManager):
  pass

class CounterProxy(NamespaceProxy):
  _exposed_ = ('__getattribute__', '__setattr__', '__delattr__', 'update')

  def update(self, value):
    callmethod = object.__getattribute__(self, '_callmethod')
    return callmethod(self.update.__name__, (value,))

CounterManager.register('Counter', Counter, CounterProxy)

def main():
  manager = CounterManager()
  manager.start()

  counter = manager.Counter()
  pool = multiprocessing.Pool(multiprocessing.cpu_count())
  for i in range(10):
    pool.apply(func = update, args = (counter, i))
  pool.close()
  pool.join()

  print 'Should be 10 but is %s.' % counter.value

if __name__ == '__main__':
  main()

multiprocessing.Value 并非设计用于自定义 classes,它应该类似于 multiprocessing.sharedctypes.Value. Instead, you need to create a custom manager 并向其注册您的 class。如果您不直接访问 value,而是通过方法 modify/access 访问它,您的生活也会更轻松,这些方法将默认为您的 class 创建的 Proxy 导出默认。常规属性(如 Counter.value)不是,因此如果不进行额外的自定义,则无法访问它们。这是一个工作示例:

import multiprocessing
from multiprocessing.managers import BaseManager

class MyManager(BaseManager): pass

def Manager():
    m = MyManager()
    m.start()
    return m 

class Counter(object):
  def __init__(self):
    self._value = 0

  def update(self, value):
    self._value += value

  def get_value(self):
      return self._value

MyManager.register('Counter', Counter)

def update(counter_proxy, thread_id):
  counter_proxy.update(1)
  print counter_proxy.get_value(), 't%s' % thread_id, \
    multiprocessing.current_process().name
  return counter_proxy

def main():
  manager = Manager()
  counter = manager.Counter()
  pool = multiprocessing.Pool(multiprocessing.cpu_count())
  for i in range(10):
    pool.apply(func=update, args=(counter, i))
  pool.close()
  pool.join()

  print 'Should be 10 but is %s.' % counter.get_value()

if __name__ == '__main__':
  main()

输出:

1 t0 PoolWorker-2
2 t1 PoolWorker-8
3 t2 PoolWorker-4
4 t3 PoolWorker-5
5 t4 PoolWorker-6
6 t5 PoolWorker-7
7 t6 PoolWorker-3
8 t7 PoolWorker-9
9 t8 PoolWorker-2
10 t9 PoolWorker-8
Should be 10 but is 10.