Python 多处理 - 如何实现共享计数器和队列

Python Multiprocessing - How to implement a shared counter and Queue

我有以下Python3代码:

from multiprocessing import Process, JoinableQueue, Value
from multiprocessing.managers import SyncManager

def test():
    global numfail
    global queue
    while not queue.empty():
        number = queue.get()
        with numfail.get_lock():
            numfail.value += number
        queue.task_done()

def startthreads():
    global numfail
    global queue
    queue = JoinableQueue()
    for i in range(100):
        numfail = Value('i',0)
        for j in range(10):
            queue.put(j)
        for j in range(10):
            Process(target=test, args=[]).start()

        queue.join()
        print(i, numfail.value)

numfail = 0
startthreads()

每次输出都应该是 45,但是有时有些值不会添加到总和中,有时会出现死锁。

我也和经理试过:

from multiprocessing import Process, JoinableQueue, Value
from multiprocessing.managers import SyncManager

def test():
    global numfail
    global queue
    while not queue.empty():
        number = queue.get()
        numfail.value += number
        queue.task_done()

def startthreads():
    global numfail
    global queue
    queue = JoinableQueue()
    manager = SyncManager()
    manager.start()
    for i in range(100):
        numfail = manager.Value('i',0)
        for j in range(10):
            queue.put(j)
        for j in range(10):
            Process(target=test, args=[]).start()

        queue.join()
        print(i, numfail.value)

numfail = 0
startthreads()

但是我得到以下错误

Process Process-281:
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "MT.py", line 9, in test
    numfail.value += number
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 1154, in get
    return self._callmethod('get')
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 850, in _callmethod
    raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError: 
---------------------------------------------------------------------------
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 250, in serve_client
    self.id_to_local_proxy_obj[ident]
KeyError: '7f8da01ff310'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 252, in serve_client
    raise ke
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 246, in serve_client
    obj, exposed, gettypeid = id_to_obj[ident]
KeyError: '7f8da01ff310'
---------------------------------------------------------------------------
Process Process-280:
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "MT.py", line 9, in test
    numfail.value += number
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 1154, in get
    return self._callmethod('get')
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 850, in _callmethod
    raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError: 
---------------------------------------------------------------------------
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 250, in serve_client
    self.id_to_local_proxy_obj[ident]
KeyError: '7f8da01ff310'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 252, in serve_client
    raise ke
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 246, in serve_client
    obj, exposed, gettypeid = id_to_obj[ident]
KeyError: '7f8da01ff310'
---------------------------------------------------------------------------
Process Process-275:
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "MT.py", line 9, in test
    numfail.value += number
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 1154, in get
    return self._callmethod('get')
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 850, in _callmethod
    raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError: 
---------------------------------------------------------------------------
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 250, in serve_client
    self.id_to_local_proxy_obj[ident]
KeyError: '7f8da01ff310'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 252, in serve_client
    raise ke
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 246, in serve_client
    obj, exposed, gettypeid = id_to_obj[ident]
KeyError: '7f8da01ff310'
---------------------------------------------------------------------------

如何解决我的问题或如何以另一种方式实现它,以便我可以将队列中的“任务”发送到另一个函数并让该函数处理所有项目并为每个项目增加一个变量项目。

摆脱全局变量并将它们作为参数传递给子进程。

另外,您可以为队列使用 SyncManager 代理:

queue = manager.Queue()

也考虑使用池。