使用 multiprocessing 运行 顺序处理而不是同时处理的并行化

Parallelization using multiprocessing running processes sequentially instead of simultaneously

我正在尝试使用多处理模块并行化下面给出的一段代码。我尝试的每件事都会导致每个子进程一个接一个 运行,即使它们都有不同的 PID。我试过:

  1. CentOS 和 MacOS
  2. 作为 spawn 和 fork 的上下文
  3. 使用队列和池
  4. 使用应用和使用地图及其异步版本
  5. Adding/removing pool.join() 和 Process.join()

我不知道我做错了什么。

fs.py:

import numpy as np
from time import sleep
import os

def f(r):
    res = np.arange(r[0], r[1])
    print(f'I am {os.getpid()}')
    sleep(10)
    print(f'I am {os.getpid()} and I am finished')
    return {'nums': res, 'dubs': res * 2}

playground.py:

import multiprocessing as mp
import numpy as np
from fs import f


if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    p = ctx.Pool(4)
    with p:
        subsets = [[0, 3], [3, 6], [6, 7]]
        res = [p.apply(f, (subset, )) for subset in subsets]
        print(res)

    print('Done!')

命令:python playground.py

输出:

I am 29881
I am 29881 and I am finished
I am 29882
I am 29882 and I am finished
I am 29881
I am 29881 and I am finished
[{'nums': array([0, 1, 2]), 'dubs': array([0, 2, 4])}, {'nums': array([3, 4, 5]), 
  'dubs': array([ 6,  8, 10])}, {'nums': array([6]), 'dubs': array([12])}]
Done!

当我像这样使用 p.map() 时(在 Linux Mint 上)

res = p.map(f, subsets)

然后我得到

I am 1337328
I am 1337325
I am 1337327
I am 1337328 and I am finished
I am 1337325 and I am finished
I am 1337327 and I am finished

也许你用错了map()res = [p.map(f, (subset, )) for subset in subsets]


if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    with ctx.Pool(4) as p:
        subsets = [[0, 3], [3, 6], [6, 7]]
        res = p.map(f, subsets)
        print(res)
        
    print('Done!')

对于 apply_async 你需要两个 for-loops

    items = [p.apply_async(f, (subset, )) for subset in subsets]
    res = [x.get() for x in items]
    print(res)

而且两者都必须在里面 with p:


if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    with ctx.Pool(4) as p:
        subsets = [[0, 3], [3, 6], [6, 7]]

        items = [p.apply_async(f, (subset, )) for subset in subsets]
        print(items)
        
        res = [x.get() for x in items]
        print(res)
        
    print('Done!')

每个子进程都运行一个接一个地执行,因为Pool.apply()直到结果已准备就绪——有效地阻止了并行处理的发生。

使用 Pool.map_async() 可以防止这种情况发生。请注意,我还在 f() 函数变量中设置了延迟,以使处理时间发生变化。

playground.py

import multiprocessing as mp
import numpy as np
from pprint import pprint
from fs import f


if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    pool = ctx.Pool(4)
    with pool:
        subsets = [[0, 3], [3, 6], [6, 7]]
        res = pool.map_async(f, subsets).get(timeout=10)
    pprint(res, sort_dicts=False)

    print('Done!')

fs.py

import numpy as np
import os
import random
from time import sleep

def f(r):
    print(f'f({r}) called')
    res = np.arange(r[0], r[1])
    print(f'I am {os.getpid()}')
    sleep(random.uniform(0, 2))  # Random time delay.
    print(f'I am {os.getpid()} and I am finished')
    return {'nums': res, 'dubs': res * 2}

结果:

f([0, 3]) called
I am 2120
f([3, 6]) called
I am 32208
f([6, 7]) called
I am 13884
I am 2120 and I am finished
I am 13884 and I am finished
I am 32208 and I am finished
[{'nums': array([0, 1, 2]), 'dubs': array([0, 2, 4])},
 {'nums': array([3, 4, 5]), 'dubs': array([ 6,  8, 10])},
 {'nums': array([6]), 'dubs': array([12])}]