Python 多处理锁
Python Multiprocessing Locks
此多处理代码按预期工作。它创建 4 Python 个进程,并使用它们打印数字 0 到 39,每次打印后都有延迟。
import multiprocessing
import time
def job(num):
print num
time.sleep(1)
pool = multiprocessing.Pool(4)
lst = range(40)
for i in lst:
pool.apply_async(job, [i])
pool.close()
pool.join()
但是,当我尝试使用 multiprocessing.Lock 来防止多个进程打印到标准输出时,程序立即退出而没有任何输出。
import multiprocessing
import time
def job(lock, num):
lock.acquire()
print num
lock.release()
time.sleep(1)
pool = multiprocessing.Pool(4)
l = multiprocessing.Lock()
lst = range(40)
for i in lst:
pool.apply_async(job, [l, i])
pool.close()
pool.join()
为什么引入 multiprocessing.Lock 会使此代码不起作用?
更新:它在全局声明锁时起作用(我做了一些非确定性测试来检查锁是否起作用),这与上面将锁作为参数传递的代码相反(Python 的多处理文档显示锁作为参数传递)。下面的代码有一个全局声明的锁,而不是在上面的代码中作为参数传递。
import multiprocessing
import time
l = multiprocessing.Lock()
def job(num):
l.acquire()
print num
l.release()
time.sleep(1)
pool = multiprocessing.Pool(4)
lst = range(40)
for i in lst:
pool.apply_async(job, [i])
pool.close()
pool.join()
我认为原因是多处理池使用 pickle
在进程之间传输对象。然而,一个 Lock
不能被 pickle:
>>> import multiprocessing
>>> import pickle
>>> lock = multiprocessing.Lock()
>>> lp = pickle.dumps(lock)
Traceback (most recent call last):
File "<pyshell#3>", line 1, in <module>
lp = pickle.dumps(lock)
...
RuntimeError: Lock objects should only be shared between processes through inheritance
>>>
参见 https://docs.python.org/2/library/multiprocessing.html#all-platforms
的 "Picklability" 和 "Better to inherit than pickle/unpickle" 部分
如果将 pool.apply_async
更改为 pool.apply
,则会出现此异常:
Traceback (most recent call last):
File "p.py", line 15, in <module>
pool.apply(job, [l, i])
File "/usr/lib/python2.7/multiprocessing/pool.py", line 244, in apply
return self.apply_async(func, args, kwds).get()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 558, in get
raise self._value
RuntimeError: Lock objects should only be shared between processes through inheritance
pool.apply_async
只是隐藏它。我不想这么说,但使用全局变量可能是您示例的最简单方法。让我们只希望 velociraptors 不会让你失望。
其他答案已经提供了 apply_async
静默失败的答案,除非提供了适当的 error_callback
参数。我仍然发现 OP 的另一点是有效的——官方文档确实显示 multiprocessing.Lock
作为函数参数传递。事实上,Programming guidelines 中标题为 "Explicitly pass resources to child processes" 的 sub-section 建议传递一个 multiprocessing.Lock
object 作为函数参数而不是全局变量。而且,我一直在编写很多代码,在这些代码中我将 multiprocessing.Lock
作为参数传递给 child 进程,并且一切都按预期工作。
那么,是什么原因呢?
我首先调查了multiprocessing.Lock
是不是pickle-able。在Python 3中,MacOS+CPython,试图pickle multiprocessing.Lock
产生其他人遇到的熟悉的RuntimeError
。
>>> pickle.dumps(multiprocessing.Lock())
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
<ipython-input-7-66dfe1355652> in <module>
----> 1 pickle.dumps(multiprocessing.Lock())
/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/synchronize.py in __getstate__(self)
99
100 def __getstate__(self):
--> 101 context.assert_spawning(self)
102 sl = self._semlock
103 if sys.platform == 'win32':
/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py in assert_spawning(obj)
354 raise RuntimeError(
355 '%s objects should only be shared between processes'
--> 356 ' through inheritance' % type(obj).__name__
357 )
RuntimeError: Lock objects should only be shared between processes through inheritance
对我来说,这证实了 multiprocessing.Lock
确实不是 pickle-able。
旁白开始
但是,相同 锁仍然需要在两个或多个 python 进程之间共享,这些进程将拥有自己的、可能不同的地址空间(例如当我们使用 "spawn" 或 "forkserver" 作为启动方法)。 multiprocessing
必须做一些特殊的事情来跨进程发送锁。 other Whosebug post 似乎表明在 Unix 系统中,multiprocessing.Lock
可以通过 OS 本身(在 python 之外)支持的命名信号量来实现。然后,两个或多个 python 进程可以 link 到 相同的 锁,该锁实际上驻留在两个 python 进程之外的一个位置。也可能有一个共享内存实现。
一边结束
我们可以将 multiprocessing.Lock
object 作为参数传递吗?
经过更多的实验和更多的阅读,看来区别在于 multiprocessing.Pool
和 multiprocessing.Process
。
multiprocessing.Process
允许您将 multiprocessing.Lock
作为参数传递,但 multiprocessing.Pool
不能。这是一个有效的例子:
import multiprocessing
import time
from multiprocessing import Process, Lock
def task(n: int, lock):
with lock:
print(f'n={n}')
time.sleep(0.25)
if __name__ == '__main__':
multiprocessing.set_start_method('forkserver')
lock = Lock()
processes = [Process(target=task, args=(i, lock)) for i in range(20)]
for process in processes:
process.start()
for process in processes:
process.join()
请注意 __name__ == '__main__'
的使用是必不可少的,如 Programming guidelines 的 "Safe importing of main module" sub-section 中所述。
multiprocessing.Pool
似乎使用 queue.SimpleQueue
将每个任务放在 queue 中,这就是酸洗发生的地方。最有可能的是,multiprocessing.Process
没有使用酸洗(或进行特殊版本的酸洗)。
此多处理代码按预期工作。它创建 4 Python 个进程,并使用它们打印数字 0 到 39,每次打印后都有延迟。
import multiprocessing
import time
def job(num):
print num
time.sleep(1)
pool = multiprocessing.Pool(4)
lst = range(40)
for i in lst:
pool.apply_async(job, [i])
pool.close()
pool.join()
但是,当我尝试使用 multiprocessing.Lock 来防止多个进程打印到标准输出时,程序立即退出而没有任何输出。
import multiprocessing
import time
def job(lock, num):
lock.acquire()
print num
lock.release()
time.sleep(1)
pool = multiprocessing.Pool(4)
l = multiprocessing.Lock()
lst = range(40)
for i in lst:
pool.apply_async(job, [l, i])
pool.close()
pool.join()
为什么引入 multiprocessing.Lock 会使此代码不起作用?
更新:它在全局声明锁时起作用(我做了一些非确定性测试来检查锁是否起作用),这与上面将锁作为参数传递的代码相反(Python 的多处理文档显示锁作为参数传递)。下面的代码有一个全局声明的锁,而不是在上面的代码中作为参数传递。
import multiprocessing
import time
l = multiprocessing.Lock()
def job(num):
l.acquire()
print num
l.release()
time.sleep(1)
pool = multiprocessing.Pool(4)
lst = range(40)
for i in lst:
pool.apply_async(job, [i])
pool.close()
pool.join()
我认为原因是多处理池使用 pickle
在进程之间传输对象。然而,一个 Lock
不能被 pickle:
>>> import multiprocessing
>>> import pickle
>>> lock = multiprocessing.Lock()
>>> lp = pickle.dumps(lock)
Traceback (most recent call last):
File "<pyshell#3>", line 1, in <module>
lp = pickle.dumps(lock)
...
RuntimeError: Lock objects should only be shared between processes through inheritance
>>>
参见 https://docs.python.org/2/library/multiprocessing.html#all-platforms
的 "Picklability" 和 "Better to inherit than pickle/unpickle" 部分如果将 pool.apply_async
更改为 pool.apply
,则会出现此异常:
Traceback (most recent call last):
File "p.py", line 15, in <module>
pool.apply(job, [l, i])
File "/usr/lib/python2.7/multiprocessing/pool.py", line 244, in apply
return self.apply_async(func, args, kwds).get()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 558, in get
raise self._value
RuntimeError: Lock objects should only be shared between processes through inheritance
pool.apply_async
只是隐藏它。我不想这么说,但使用全局变量可能是您示例的最简单方法。让我们只希望 velociraptors 不会让你失望。
其他答案已经提供了 apply_async
静默失败的答案,除非提供了适当的 error_callback
参数。我仍然发现 OP 的另一点是有效的——官方文档确实显示 multiprocessing.Lock
作为函数参数传递。事实上,Programming guidelines 中标题为 "Explicitly pass resources to child processes" 的 sub-section 建议传递一个 multiprocessing.Lock
object 作为函数参数而不是全局变量。而且,我一直在编写很多代码,在这些代码中我将 multiprocessing.Lock
作为参数传递给 child 进程,并且一切都按预期工作。
那么,是什么原因呢?
我首先调查了multiprocessing.Lock
是不是pickle-able。在Python 3中,MacOS+CPython,试图pickle multiprocessing.Lock
产生其他人遇到的熟悉的RuntimeError
。
>>> pickle.dumps(multiprocessing.Lock())
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
<ipython-input-7-66dfe1355652> in <module>
----> 1 pickle.dumps(multiprocessing.Lock())
/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/synchronize.py in __getstate__(self)
99
100 def __getstate__(self):
--> 101 context.assert_spawning(self)
102 sl = self._semlock
103 if sys.platform == 'win32':
/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py in assert_spawning(obj)
354 raise RuntimeError(
355 '%s objects should only be shared between processes'
--> 356 ' through inheritance' % type(obj).__name__
357 )
RuntimeError: Lock objects should only be shared between processes through inheritance
对我来说,这证实了 multiprocessing.Lock
确实不是 pickle-able。
旁白开始
但是,相同 锁仍然需要在两个或多个 python 进程之间共享,这些进程将拥有自己的、可能不同的地址空间(例如当我们使用 "spawn" 或 "forkserver" 作为启动方法)。 multiprocessing
必须做一些特殊的事情来跨进程发送锁。 other Whosebug post 似乎表明在 Unix 系统中,multiprocessing.Lock
可以通过 OS 本身(在 python 之外)支持的命名信号量来实现。然后,两个或多个 python 进程可以 link 到 相同的 锁,该锁实际上驻留在两个 python 进程之外的一个位置。也可能有一个共享内存实现。
一边结束
我们可以将 multiprocessing.Lock
object 作为参数传递吗?
经过更多的实验和更多的阅读,看来区别在于 multiprocessing.Pool
和 multiprocessing.Process
。
multiprocessing.Process
允许您将 multiprocessing.Lock
作为参数传递,但 multiprocessing.Pool
不能。这是一个有效的例子:
import multiprocessing
import time
from multiprocessing import Process, Lock
def task(n: int, lock):
with lock:
print(f'n={n}')
time.sleep(0.25)
if __name__ == '__main__':
multiprocessing.set_start_method('forkserver')
lock = Lock()
processes = [Process(target=task, args=(i, lock)) for i in range(20)]
for process in processes:
process.start()
for process in processes:
process.join()
请注意 __name__ == '__main__'
的使用是必不可少的,如 Programming guidelines 的 "Safe importing of main module" sub-section 中所述。
multiprocessing.Pool
似乎使用 queue.SimpleQueue
将每个任务放在 queue 中,这就是酸洗发生的地方。最有可能的是,multiprocessing.Process
没有使用酸洗(或进行特殊版本的酸洗)。