为什么这个小片段在使用 maxtasksperchild、numpy.random.randint 和 numpy.random.seed 的多重处理时挂起?
Why this small snippet hangs using multiprocessing with maxtasksperchild, numpy.random.randint and numpy.random.seed?
我有一个 python 脚本,它以随机方式同时处理 numpy 数组和图像。为了在生成的进程中具有适当的随机性,我将一个随机种子从主进程传递给工作人员以供他们播种。
当我对 Pool
使用 maxtasksperchild
时,我的脚本在 运行 Pool.map
之后多次挂起。
以下是重现问题的最小片段:
# This code stops after multiprocessing.Pool workers are replaced one single time.
# They are replaced due to maxtasksperchild parameter to Pool
from multiprocessing import Pool
import numpy as np
def worker(n):
# Removing np.random.seed solves the issue
np.random.seed(1) #any seed value
return 1234 # trivial return value
# Removing maxtasksperchild solves the issue
ppool = Pool(20 , maxtasksperchild=5)
i=0
while True:
i += 1
# Removing np.random.randint(10) or taking it out of the loop solves the issue
rand = np.random.randint(10)
l = [3] # trivial input to ppool.map
result = ppool.map(worker, l)
print i,result[0]
这是输出
1 1234
2 1234
3 1234
.
.
.
99 1234
100 1234 # at this point workers should've reached maxtasksperchild tasks
101 1234
102 1234
103 1234
104 1234
105 1234
106 1234
107 1234
108 1234
109 1234
110 1234
然后无限期挂起。
我可以用 python 的 random
替换 numpy.random
并解决问题。但是在我的实际应用程序中,工作人员将执行我无法控制的用户代码(作为参数提供给工作人员),并希望允许在该用户代码中使用 numpy.random
函数。所以我有意为全局随机生成器(每个进程独立)播种。
已使用 Python 2.7.10、numpy 1.11.0、1.12.0 和 1.13.0、Ubuntu 和 OSX
进行测试
使用numpy.random.seed
不是线程安全的。 numpy.random.seed
全局更改种子的值,而据我了解,您正试图在本地更改种子。
如果您确实想要实现的是让生成器在每个 worker 开始时播种,以下是一个解决方案:
def worker(n):
# Removing np.random.seed solves the problem
randgen = np.random.RandomState(45678) # RandomState, not seed!
# ...Do something with randgen...
return 1234 # trivial return value
将其作为完整的答案,因为它不适合发表评论。
玩了一会儿后,这里有些东西闻起来像 numpy.random 错误。我能够重现冻结错误,此外还有一些其他不应该发生的奇怪事情,比如手动播种生成器不工作。
def rand_seed(rand, i):
print(i)
np.random.seed(i)
print(i)
print(rand())
def test1():
with multiprocessing.Pool() as pool:
[pool.apply_async(rand_seed, (np.random.random_sample, i)).get()
for i in range(5)]
test1()
有输出
0
0
0.3205032737431185
1
1
0.3205032737431185
2
2
0.3205032737431185
3
3
0.3205032737431185
4
4
0.3205032737431185
另一方面,不将 np.random.random_sample 作为参数传递就可以了。
def rand_seed2(i):
print(i)
np.random.seed(i)
print(i)
print(np.random.random_sample())
def test2():
with multiprocessing.Pool() as pool:
[pool.apply_async(rand_seed, (i,)).get()
for i in range(5)]
test2()
有输出
0
0
0.5488135039273248
1
1
0.417022004702574
2
2
0.43599490214200376
3
3
0.5507979025745755
4
4
0.9670298390136767
这表明幕后正在进行一些严重的愚蠢行为。虽然不知道还有什么要说的....
基本上 numpy.random.seed 似乎不仅修改了 "seed state" 变量,而且修改了 random_sample
函数本身。
事实证明这是来自 Python threading.Lock
和 multiprocessing
的错误交互。
np.random.seed
和大多数 np.random.*
函数使用 threading.Lock
来确保 thread-safety。 np.random.*
函数生成一个随机数,然后更新种子(跨线程共享),这就是需要锁的原因。参见 np.random.seed and cont0_array(由 np.random.random()
和其他人使用)。
这怎么会导致上面的代码片段出现问题?
简而言之,代码段挂起是因为 threading.Lock
状态在分叉时被继承。因此,当 child 同时分叉时,parent(由 np.random.randint(10)
)获取锁,child 死锁(在 np.random.seed
)。
@njsmith 在这个 github 问题 https://github.com/numpy/numpy/issues/9248#issuecomment-308054786
中对此进行了解释
multiprocessing.Pool spawns a background thread to manage workers: https://github.com/python/cpython/blob/aefa7ebf0ff0f73feee7ab24f4cdcb2014d83ee5/Lib/multiprocessing/pool.py#L170-L173
It loops in the background calling _maintain_pool: https://github.com/python/cpython/blob/aefa7ebf0ff0f73feee7ab24f4cdcb2014d83ee5/Lib/multiprocessing/pool.py#L366
If a worker exits, for example due to a maxtasksperchild limit, then _maintain_pool calls _repopulate_pool: https://github.com/python/cpython/blob/aefa7ebf0ff0f73feee7ab24f4cdcb2014d83ee5/Lib/multiprocessing/pool.py#L240
And then _repopulate_pool forks some new workers, still in this background thread: https://github.com/python/cpython/blob/aefa7ebf0ff0f73feee7ab24f4cdcb2014d83ee5/Lib/multiprocessing/pool.py#L224
So what's happening is that eventually you get unlucky, and at the same moment that your main thread is calling some np.random function and holding the lock, multiprocessing decides to fork a child, which starts out with the np.random lock already held but the thread that was holding it is gone. Then the child tries to call into np.random, which requires taking the lock, and so the child deadlocks.
The simple workaround here is to not use fork with multiprocessing. If you use the spawn or forkserver start methods then this should go away.
For a proper fix.... ughhh. I guess we.. need to register a pthread_atfork pre-fork handler that takes the np.random lock before fork and then releases it afterwards? And really I guess we need to do this for every lock in numpy, which requires something like keeping a weakset of every RandomState object, and _FFTCache also appears to have a lock...
(On the plus side, this would also give us an opportunity to reinitialize the global random state in the child, which we really should be doing in cases where the user hasn't explicitly seeded it.)
我有一个 python 脚本,它以随机方式同时处理 numpy 数组和图像。为了在生成的进程中具有适当的随机性,我将一个随机种子从主进程传递给工作人员以供他们播种。
当我对 Pool
使用 maxtasksperchild
时,我的脚本在 运行 Pool.map
之后多次挂起。
以下是重现问题的最小片段:
# This code stops after multiprocessing.Pool workers are replaced one single time.
# They are replaced due to maxtasksperchild parameter to Pool
from multiprocessing import Pool
import numpy as np
def worker(n):
# Removing np.random.seed solves the issue
np.random.seed(1) #any seed value
return 1234 # trivial return value
# Removing maxtasksperchild solves the issue
ppool = Pool(20 , maxtasksperchild=5)
i=0
while True:
i += 1
# Removing np.random.randint(10) or taking it out of the loop solves the issue
rand = np.random.randint(10)
l = [3] # trivial input to ppool.map
result = ppool.map(worker, l)
print i,result[0]
这是输出
1 1234 2 1234 3 1234 . . . 99 1234 100 1234 # at this point workers should've reached maxtasksperchild tasks 101 1234 102 1234 103 1234 104 1234 105 1234 106 1234 107 1234 108 1234 109 1234 110 1234
然后无限期挂起。
我可以用 python 的 random
替换 numpy.random
并解决问题。但是在我的实际应用程序中,工作人员将执行我无法控制的用户代码(作为参数提供给工作人员),并希望允许在该用户代码中使用 numpy.random
函数。所以我有意为全局随机生成器(每个进程独立)播种。
已使用 Python 2.7.10、numpy 1.11.0、1.12.0 和 1.13.0、Ubuntu 和 OSX
进行测试使用numpy.random.seed
不是线程安全的。 numpy.random.seed
全局更改种子的值,而据我了解,您正试图在本地更改种子。
如果您确实想要实现的是让生成器在每个 worker 开始时播种,以下是一个解决方案:
def worker(n):
# Removing np.random.seed solves the problem
randgen = np.random.RandomState(45678) # RandomState, not seed!
# ...Do something with randgen...
return 1234 # trivial return value
将其作为完整的答案,因为它不适合发表评论。
玩了一会儿后,这里有些东西闻起来像 numpy.random 错误。我能够重现冻结错误,此外还有一些其他不应该发生的奇怪事情,比如手动播种生成器不工作。
def rand_seed(rand, i):
print(i)
np.random.seed(i)
print(i)
print(rand())
def test1():
with multiprocessing.Pool() as pool:
[pool.apply_async(rand_seed, (np.random.random_sample, i)).get()
for i in range(5)]
test1()
有输出
0
0
0.3205032737431185
1
1
0.3205032737431185
2
2
0.3205032737431185
3
3
0.3205032737431185
4
4
0.3205032737431185
另一方面,不将 np.random.random_sample 作为参数传递就可以了。
def rand_seed2(i):
print(i)
np.random.seed(i)
print(i)
print(np.random.random_sample())
def test2():
with multiprocessing.Pool() as pool:
[pool.apply_async(rand_seed, (i,)).get()
for i in range(5)]
test2()
有输出
0
0
0.5488135039273248
1
1
0.417022004702574
2
2
0.43599490214200376
3
3
0.5507979025745755
4
4
0.9670298390136767
这表明幕后正在进行一些严重的愚蠢行为。虽然不知道还有什么要说的....
基本上 numpy.random.seed 似乎不仅修改了 "seed state" 变量,而且修改了 random_sample
函数本身。
事实证明这是来自 Python threading.Lock
和 multiprocessing
的错误交互。
np.random.seed
和大多数 np.random.*
函数使用 threading.Lock
来确保 thread-safety。 np.random.*
函数生成一个随机数,然后更新种子(跨线程共享),这就是需要锁的原因。参见 np.random.seed and cont0_array(由 np.random.random()
和其他人使用)。
这怎么会导致上面的代码片段出现问题?
简而言之,代码段挂起是因为 threading.Lock
状态在分叉时被继承。因此,当 child 同时分叉时,parent(由 np.random.randint(10)
)获取锁,child 死锁(在 np.random.seed
)。
@njsmith 在这个 github 问题 https://github.com/numpy/numpy/issues/9248#issuecomment-308054786
中对此进行了解释multiprocessing.Pool spawns a background thread to manage workers: https://github.com/python/cpython/blob/aefa7ebf0ff0f73feee7ab24f4cdcb2014d83ee5/Lib/multiprocessing/pool.py#L170-L173
It loops in the background calling _maintain_pool: https://github.com/python/cpython/blob/aefa7ebf0ff0f73feee7ab24f4cdcb2014d83ee5/Lib/multiprocessing/pool.py#L366
If a worker exits, for example due to a maxtasksperchild limit, then _maintain_pool calls _repopulate_pool: https://github.com/python/cpython/blob/aefa7ebf0ff0f73feee7ab24f4cdcb2014d83ee5/Lib/multiprocessing/pool.py#L240
And then _repopulate_pool forks some new workers, still in this background thread: https://github.com/python/cpython/blob/aefa7ebf0ff0f73feee7ab24f4cdcb2014d83ee5/Lib/multiprocessing/pool.py#L224
So what's happening is that eventually you get unlucky, and at the same moment that your main thread is calling some np.random function and holding the lock, multiprocessing decides to fork a child, which starts out with the np.random lock already held but the thread that was holding it is gone. Then the child tries to call into np.random, which requires taking the lock, and so the child deadlocks.
The simple workaround here is to not use fork with multiprocessing. If you use the spawn or forkserver start methods then this should go away.
For a proper fix.... ughhh. I guess we.. need to register a pthread_atfork pre-fork handler that takes the np.random lock before fork and then releases it afterwards? And really I guess we need to do this for every lock in numpy, which requires something like keeping a weakset of every RandomState object, and _FFTCache also appears to have a lock...
(On the plus side, this would also give us an opportunity to reinitialize the global random state in the child, which we really should be doing in cases where the user hasn't explicitly seeded it.)