为什么这个小片段在使用 maxtasksperchild、numpy.random.randint 和 numpy.random.seed 的多重处理时挂起?

Why this small snippet hangs using multiprocessing with maxtasksperchild, numpy.random.randint and numpy.random.seed?

我有一个 python 脚本,它以随机方式同时处理 numpy 数组和图像。为了在生成的进程中具有适当的随机性,我将一个随机种子从主进程传递给工作人员以供他们播种。

当我对 Pool 使用 maxtasksperchild 时,我的脚本在 运行 Pool.map 之后多次挂起。

以下是重现问题的最小片段:

# This code stops after multiprocessing.Pool workers are replaced one single time.
# They are replaced due to maxtasksperchild parameter to Pool
from multiprocessing import Pool
import numpy as np

def worker(n):
    # Removing np.random.seed solves the issue
    np.random.seed(1) #any seed value
    return 1234 # trivial return value

# Removing maxtasksperchild solves the issue
ppool = Pool(20 , maxtasksperchild=5)
i=0
while True:
    i += 1
    # Removing np.random.randint(10) or taking it out of the loop solves the issue
    rand = np.random.randint(10)
    l  = [3] # trivial input to ppool.map
    result = ppool.map(worker, l)
    print i,result[0]

这是输出

1 1234
2 1234
3 1234
.
.
.
99 1234
100 1234 # at this point workers should've reached maxtasksperchild tasks
101 1234
102 1234
103 1234
104 1234
105 1234
106 1234
107 1234
108 1234
109 1234
110 1234

然后无限期挂起。

我可以用 python 的 random 替换 numpy.random 并解决问题。但是在我的实际应用程序中,工作人员将执行我无法控制的用户代码(作为参数提供给工作人员),并希望允许在该用户代码中使用 numpy.random 函数。所以我有意为全局随机生成器(每个进程独立)播种。

已使用 Python 2.7.10、numpy 1.11.0、1.12.0 和 1.13.0、Ubuntu 和 OSX

进行测试

使用numpy.random.seed 不是线程安全的。 numpy.random.seed 全局更改种子的值,而据我了解,您正试图在本地更改种子。

the docs

如果您确实想要实现的是让生成器在每个 worker 开始时播种,以下是一个解决方案:

def worker(n):
    # Removing np.random.seed solves the problem                                                               
    randgen = np.random.RandomState(45678) # RandomState, not seed!
    # ...Do something with randgen...                                           
    return 1234 # trivial return value                                                                         

将其作为完整的答案,因为它不适合发表评论。

玩了一会儿后,这里有些东西闻起来像 numpy.random 错误。我能够重现冻结错误,此外还有一些其他不应该发生的奇怪事情,比如手动播种生成器不工作。

def rand_seed(rand, i):
    print(i)
    np.random.seed(i)
    print(i)
    print(rand())
def test1():
    with multiprocessing.Pool() as pool:
        [pool.apply_async(rand_seed, (np.random.random_sample, i)).get()
        for i in range(5)]
test1()

有输出

0
0
0.3205032737431185
1
1
0.3205032737431185
2
2
0.3205032737431185
3
3
0.3205032737431185
4
4
0.3205032737431185

另一方面,不将 np.random.random_sample 作为参数传递就可以了。

def rand_seed2(i):
    print(i)
    np.random.seed(i)
    print(i)
    print(np.random.random_sample())
def test2():
    with multiprocessing.Pool() as pool:
        [pool.apply_async(rand_seed, (i,)).get()
        for i in range(5)]
test2()

有输出

0
0
0.5488135039273248
1
1
0.417022004702574
2
2
0.43599490214200376
3
3
0.5507979025745755
4
4
0.9670298390136767

这表明幕后正在进行一些严重的愚蠢行为。虽然不知道还有什么要说的....

基本上 numpy.random.seed 似乎不仅修改了 "seed state" 变量,而且修改了 random_sample 函数本身。

事实证明这是来自 Python threading.Lockmultiprocessing 的错误交互。

np.random.seed 和大多数 np.random.* 函数使用 threading.Lock 来确保 thread-safety。 np.random.* 函数生成一个随机数,然后更新种子(跨线程共享),这就是需要锁的原因。参见 np.random.seed and cont0_array(由 np.random.random() 和其他人使用)。

这怎么会导致上面的代码片段出现问题?

简而言之,代码段挂起是因为 threading.Lock 状态在分叉时被继承。因此,当 child 同时分叉时,parent(由 np.random.randint(10))获取锁,child 死锁(在 np.random.seed)。

@njsmith 在这个 github 问题 https://github.com/numpy/numpy/issues/9248#issuecomment-308054786

中对此进行了解释

multiprocessing.Pool spawns a background thread to manage workers: https://github.com/python/cpython/blob/aefa7ebf0ff0f73feee7ab24f4cdcb2014d83ee5/Lib/multiprocessing/pool.py#L170-L173

It loops in the background calling _maintain_pool: https://github.com/python/cpython/blob/aefa7ebf0ff0f73feee7ab24f4cdcb2014d83ee5/Lib/multiprocessing/pool.py#L366

If a worker exits, for example due to a maxtasksperchild limit, then _maintain_pool calls _repopulate_pool: https://github.com/python/cpython/blob/aefa7ebf0ff0f73feee7ab24f4cdcb2014d83ee5/Lib/multiprocessing/pool.py#L240

And then _repopulate_pool forks some new workers, still in this background thread: https://github.com/python/cpython/blob/aefa7ebf0ff0f73feee7ab24f4cdcb2014d83ee5/Lib/multiprocessing/pool.py#L224

So what's happening is that eventually you get unlucky, and at the same moment that your main thread is calling some np.random function and holding the lock, multiprocessing decides to fork a child, which starts out with the np.random lock already held but the thread that was holding it is gone. Then the child tries to call into np.random, which requires taking the lock, and so the child deadlocks.

The simple workaround here is to not use fork with multiprocessing. If you use the spawn or forkserver start methods then this should go away.

For a proper fix.... ughhh. I guess we.. need to register a pthread_atfork pre-fork handler that takes the np.random lock before fork and then releases it afterwards? And really I guess we need to do this for every lock in numpy, which requires something like keeping a weakset of every RandomState object, and _FFTCache also appears to have a lock...

(On the plus side, this would also give us an opportunity to reinitialize the global random state in the child, which we really should be doing in cases where the user hasn't explicitly seeded it.)