Python 多处理 - 如何实现共享计数器和队列
Python Multiprocessing - How to implement a shared counter and Queue
我有以下Python3代码:
from multiprocessing import Process, JoinableQueue, Value
from multiprocessing.managers import SyncManager
def test():
global numfail
global queue
while not queue.empty():
number = queue.get()
with numfail.get_lock():
numfail.value += number
queue.task_done()
def startthreads():
global numfail
global queue
queue = JoinableQueue()
for i in range(100):
numfail = Value('i',0)
for j in range(10):
queue.put(j)
for j in range(10):
Process(target=test, args=[]).start()
queue.join()
print(i, numfail.value)
numfail = 0
startthreads()
每次输出都应该是 45,但是有时有些值不会添加到总和中,有时会出现死锁。
我也和经理试过:
from multiprocessing import Process, JoinableQueue, Value
from multiprocessing.managers import SyncManager
def test():
global numfail
global queue
while not queue.empty():
number = queue.get()
numfail.value += number
queue.task_done()
def startthreads():
global numfail
global queue
queue = JoinableQueue()
manager = SyncManager()
manager.start()
for i in range(100):
numfail = manager.Value('i',0)
for j in range(10):
queue.put(j)
for j in range(10):
Process(target=test, args=[]).start()
queue.join()
print(i, numfail.value)
numfail = 0
startthreads()
但是我得到以下错误
Process Process-281:
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "MT.py", line 9, in test
numfail.value += number
File "/usr/lib/python3.8/multiprocessing/managers.py", line 1154, in get
return self._callmethod('get')
File "/usr/lib/python3.8/multiprocessing/managers.py", line 850, in _callmethod
raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError:
---------------------------------------------------------------------------
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/managers.py", line 250, in serve_client
self.id_to_local_proxy_obj[ident]
KeyError: '7f8da01ff310'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/managers.py", line 252, in serve_client
raise ke
File "/usr/lib/python3.8/multiprocessing/managers.py", line 246, in serve_client
obj, exposed, gettypeid = id_to_obj[ident]
KeyError: '7f8da01ff310'
---------------------------------------------------------------------------
Process Process-280:
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "MT.py", line 9, in test
numfail.value += number
File "/usr/lib/python3.8/multiprocessing/managers.py", line 1154, in get
return self._callmethod('get')
File "/usr/lib/python3.8/multiprocessing/managers.py", line 850, in _callmethod
raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError:
---------------------------------------------------------------------------
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/managers.py", line 250, in serve_client
self.id_to_local_proxy_obj[ident]
KeyError: '7f8da01ff310'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/managers.py", line 252, in serve_client
raise ke
File "/usr/lib/python3.8/multiprocessing/managers.py", line 246, in serve_client
obj, exposed, gettypeid = id_to_obj[ident]
KeyError: '7f8da01ff310'
---------------------------------------------------------------------------
Process Process-275:
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "MT.py", line 9, in test
numfail.value += number
File "/usr/lib/python3.8/multiprocessing/managers.py", line 1154, in get
return self._callmethod('get')
File "/usr/lib/python3.8/multiprocessing/managers.py", line 850, in _callmethod
raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError:
---------------------------------------------------------------------------
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/managers.py", line 250, in serve_client
self.id_to_local_proxy_obj[ident]
KeyError: '7f8da01ff310'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/managers.py", line 252, in serve_client
raise ke
File "/usr/lib/python3.8/multiprocessing/managers.py", line 246, in serve_client
obj, exposed, gettypeid = id_to_obj[ident]
KeyError: '7f8da01ff310'
---------------------------------------------------------------------------
如何解决我的问题或如何以另一种方式实现它,以便我可以将队列中的“任务”发送到另一个函数并让该函数处理所有项目并为每个项目增加一个变量项目。
摆脱全局变量并将它们作为参数传递给子进程。
另外,您可以为队列使用 SyncManager 代理:
queue = manager.Queue()
也考虑使用池。
我有以下Python3代码:
from multiprocessing import Process, JoinableQueue, Value
from multiprocessing.managers import SyncManager
def test():
global numfail
global queue
while not queue.empty():
number = queue.get()
with numfail.get_lock():
numfail.value += number
queue.task_done()
def startthreads():
global numfail
global queue
queue = JoinableQueue()
for i in range(100):
numfail = Value('i',0)
for j in range(10):
queue.put(j)
for j in range(10):
Process(target=test, args=[]).start()
queue.join()
print(i, numfail.value)
numfail = 0
startthreads()
每次输出都应该是 45,但是有时有些值不会添加到总和中,有时会出现死锁。
我也和经理试过:
from multiprocessing import Process, JoinableQueue, Value
from multiprocessing.managers import SyncManager
def test():
global numfail
global queue
while not queue.empty():
number = queue.get()
numfail.value += number
queue.task_done()
def startthreads():
global numfail
global queue
queue = JoinableQueue()
manager = SyncManager()
manager.start()
for i in range(100):
numfail = manager.Value('i',0)
for j in range(10):
queue.put(j)
for j in range(10):
Process(target=test, args=[]).start()
queue.join()
print(i, numfail.value)
numfail = 0
startthreads()
但是我得到以下错误
Process Process-281:
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "MT.py", line 9, in test
numfail.value += number
File "/usr/lib/python3.8/multiprocessing/managers.py", line 1154, in get
return self._callmethod('get')
File "/usr/lib/python3.8/multiprocessing/managers.py", line 850, in _callmethod
raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError:
---------------------------------------------------------------------------
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/managers.py", line 250, in serve_client
self.id_to_local_proxy_obj[ident]
KeyError: '7f8da01ff310'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/managers.py", line 252, in serve_client
raise ke
File "/usr/lib/python3.8/multiprocessing/managers.py", line 246, in serve_client
obj, exposed, gettypeid = id_to_obj[ident]
KeyError: '7f8da01ff310'
---------------------------------------------------------------------------
Process Process-280:
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "MT.py", line 9, in test
numfail.value += number
File "/usr/lib/python3.8/multiprocessing/managers.py", line 1154, in get
return self._callmethod('get')
File "/usr/lib/python3.8/multiprocessing/managers.py", line 850, in _callmethod
raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError:
---------------------------------------------------------------------------
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/managers.py", line 250, in serve_client
self.id_to_local_proxy_obj[ident]
KeyError: '7f8da01ff310'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/managers.py", line 252, in serve_client
raise ke
File "/usr/lib/python3.8/multiprocessing/managers.py", line 246, in serve_client
obj, exposed, gettypeid = id_to_obj[ident]
KeyError: '7f8da01ff310'
---------------------------------------------------------------------------
Process Process-275:
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "MT.py", line 9, in test
numfail.value += number
File "/usr/lib/python3.8/multiprocessing/managers.py", line 1154, in get
return self._callmethod('get')
File "/usr/lib/python3.8/multiprocessing/managers.py", line 850, in _callmethod
raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError:
---------------------------------------------------------------------------
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/managers.py", line 250, in serve_client
self.id_to_local_proxy_obj[ident]
KeyError: '7f8da01ff310'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/managers.py", line 252, in serve_client
raise ke
File "/usr/lib/python3.8/multiprocessing/managers.py", line 246, in serve_client
obj, exposed, gettypeid = id_to_obj[ident]
KeyError: '7f8da01ff310'
---------------------------------------------------------------------------
如何解决我的问题或如何以另一种方式实现它,以便我可以将队列中的“任务”发送到另一个函数并让该函数处理所有项目并为每个项目增加一个变量项目。
摆脱全局变量并将它们作为参数传递给子进程。
另外,您可以为队列使用 SyncManager 代理:
queue = manager.Queue()
也考虑使用池。