使用 multiprocessing 运行 顺序处理而不是同时处理的并行化
Parallelization using multiprocessing running processes sequentially instead of simultaneously
我正在尝试使用多处理模块并行化下面给出的一段代码。我尝试的每件事都会导致每个子进程一个接一个 运行,即使它们都有不同的 PID。我试过:
- CentOS 和 MacOS
- 作为 spawn 和 fork 的上下文
- 使用队列和池
- 使用应用和使用地图及其异步版本
- Adding/removing pool.join() 和 Process.join()
我不知道我做错了什么。
fs.py:
import numpy as np
from time import sleep
import os
def f(r):
res = np.arange(r[0], r[1])
print(f'I am {os.getpid()}')
sleep(10)
print(f'I am {os.getpid()} and I am finished')
return {'nums': res, 'dubs': res * 2}
playground.py:
import multiprocessing as mp
import numpy as np
from fs import f
if __name__ == '__main__':
ctx = mp.get_context('spawn')
p = ctx.Pool(4)
with p:
subsets = [[0, 3], [3, 6], [6, 7]]
res = [p.apply(f, (subset, )) for subset in subsets]
print(res)
print('Done!')
命令:python playground.py
输出:
I am 29881
I am 29881 and I am finished
I am 29882
I am 29882 and I am finished
I am 29881
I am 29881 and I am finished
[{'nums': array([0, 1, 2]), 'dubs': array([0, 2, 4])}, {'nums': array([3, 4, 5]),
'dubs': array([ 6, 8, 10])}, {'nums': array([6]), 'dubs': array([12])}]
Done!
当我像这样使用 p.map()
时(在 Linux Mint 上)
res = p.map(f, subsets)
然后我得到
I am 1337328
I am 1337325
I am 1337327
I am 1337328 and I am finished
I am 1337325 and I am finished
I am 1337327 and I am finished
也许你用错了map()
。 res = [p.map(f, (subset, )) for subset in subsets]
if __name__ == '__main__':
ctx = mp.get_context('spawn')
with ctx.Pool(4) as p:
subsets = [[0, 3], [3, 6], [6, 7]]
res = p.map(f, subsets)
print(res)
print('Done!')
对于 apply_async
你需要两个 for
-loops
items = [p.apply_async(f, (subset, )) for subset in subsets]
res = [x.get() for x in items]
print(res)
而且两者都必须在里面 with p:
if __name__ == '__main__':
ctx = mp.get_context('spawn')
with ctx.Pool(4) as p:
subsets = [[0, 3], [3, 6], [6, 7]]
items = [p.apply_async(f, (subset, )) for subset in subsets]
print(items)
res = [x.get() for x in items]
print(res)
print('Done!')
每个子进程都运行一个接一个地执行,因为Pool.apply()
块直到结果已准备就绪——有效地阻止了并行处理的发生。
使用 Pool.map_async()
可以防止这种情况发生。请注意,我还在 f()
函数变量中设置了延迟,以使处理时间发生变化。
playground.py
import multiprocessing as mp
import numpy as np
from pprint import pprint
from fs import f
if __name__ == '__main__':
ctx = mp.get_context('spawn')
pool = ctx.Pool(4)
with pool:
subsets = [[0, 3], [3, 6], [6, 7]]
res = pool.map_async(f, subsets).get(timeout=10)
pprint(res, sort_dicts=False)
print('Done!')
fs.py
import numpy as np
import os
import random
from time import sleep
def f(r):
print(f'f({r}) called')
res = np.arange(r[0], r[1])
print(f'I am {os.getpid()}')
sleep(random.uniform(0, 2)) # Random time delay.
print(f'I am {os.getpid()} and I am finished')
return {'nums': res, 'dubs': res * 2}
结果:
f([0, 3]) called
I am 2120
f([3, 6]) called
I am 32208
f([6, 7]) called
I am 13884
I am 2120 and I am finished
I am 13884 and I am finished
I am 32208 and I am finished
[{'nums': array([0, 1, 2]), 'dubs': array([0, 2, 4])},
{'nums': array([3, 4, 5]), 'dubs': array([ 6, 8, 10])},
{'nums': array([6]), 'dubs': array([12])}]
我正在尝试使用多处理模块并行化下面给出的一段代码。我尝试的每件事都会导致每个子进程一个接一个 运行,即使它们都有不同的 PID。我试过:
- CentOS 和 MacOS
- 作为 spawn 和 fork 的上下文
- 使用队列和池
- 使用应用和使用地图及其异步版本
- Adding/removing pool.join() 和 Process.join()
我不知道我做错了什么。
fs.py:
import numpy as np
from time import sleep
import os
def f(r):
res = np.arange(r[0], r[1])
print(f'I am {os.getpid()}')
sleep(10)
print(f'I am {os.getpid()} and I am finished')
return {'nums': res, 'dubs': res * 2}
playground.py:
import multiprocessing as mp
import numpy as np
from fs import f
if __name__ == '__main__':
ctx = mp.get_context('spawn')
p = ctx.Pool(4)
with p:
subsets = [[0, 3], [3, 6], [6, 7]]
res = [p.apply(f, (subset, )) for subset in subsets]
print(res)
print('Done!')
命令:python playground.py
输出:
I am 29881
I am 29881 and I am finished
I am 29882
I am 29882 and I am finished
I am 29881
I am 29881 and I am finished
[{'nums': array([0, 1, 2]), 'dubs': array([0, 2, 4])}, {'nums': array([3, 4, 5]),
'dubs': array([ 6, 8, 10])}, {'nums': array([6]), 'dubs': array([12])}]
Done!
当我像这样使用 p.map()
时(在 Linux Mint 上)
res = p.map(f, subsets)
然后我得到
I am 1337328
I am 1337325
I am 1337327
I am 1337328 and I am finished
I am 1337325 and I am finished
I am 1337327 and I am finished
也许你用错了map()
。 res = [p.map(f, (subset, )) for subset in subsets]
if __name__ == '__main__':
ctx = mp.get_context('spawn')
with ctx.Pool(4) as p:
subsets = [[0, 3], [3, 6], [6, 7]]
res = p.map(f, subsets)
print(res)
print('Done!')
对于 apply_async
你需要两个 for
-loops
items = [p.apply_async(f, (subset, )) for subset in subsets]
res = [x.get() for x in items]
print(res)
而且两者都必须在里面 with p:
if __name__ == '__main__':
ctx = mp.get_context('spawn')
with ctx.Pool(4) as p:
subsets = [[0, 3], [3, 6], [6, 7]]
items = [p.apply_async(f, (subset, )) for subset in subsets]
print(items)
res = [x.get() for x in items]
print(res)
print('Done!')
每个子进程都运行一个接一个地执行,因为Pool.apply()
块直到结果已准备就绪——有效地阻止了并行处理的发生。
使用 Pool.map_async()
可以防止这种情况发生。请注意,我还在 f()
函数变量中设置了延迟,以使处理时间发生变化。
playground.py
import multiprocessing as mp
import numpy as np
from pprint import pprint
from fs import f
if __name__ == '__main__':
ctx = mp.get_context('spawn')
pool = ctx.Pool(4)
with pool:
subsets = [[0, 3], [3, 6], [6, 7]]
res = pool.map_async(f, subsets).get(timeout=10)
pprint(res, sort_dicts=False)
print('Done!')
fs.py
import numpy as np
import os
import random
from time import sleep
def f(r):
print(f'f({r}) called')
res = np.arange(r[0], r[1])
print(f'I am {os.getpid()}')
sleep(random.uniform(0, 2)) # Random time delay.
print(f'I am {os.getpid()} and I am finished')
return {'nums': res, 'dubs': res * 2}
结果:
f([0, 3]) called
I am 2120
f([3, 6]) called
I am 32208
f([6, 7]) called
I am 13884
I am 2120 and I am finished
I am 13884 and I am finished
I am 32208 and I am finished
[{'nums': array([0, 1, 2]), 'dubs': array([0, 2, 4])},
{'nums': array([3, 4, 5]), 'dubs': array([ 6, 8, 10])},
{'nums': array([6]), 'dubs': array([12])}]