Python 多处理:有效地只保存最好的运行
Python Multiprocessing: efficiently only save the best runs
我阅读了很多关于使用 multiprocessing
模块进行并行化的文章,但其中 none 完全回答了我的问题。
我有一个很长的生成器给我参数值,我想为每个生成器计算一些函数值。但是,我只想保存最好的 n
很多,因为我只对最好的感兴趣,保存所有结果会耗尽 RAM。
在我看来,有两种方法可以做到这一点:1)在保存最佳值的进程之间使用公共共享内存,或者 2)为每个 core/process 和以后手动保留最佳结果的单独列表将这些列表合并在一起。
我认为第二种方法会更好,但是我不确定如何实现它。
这是我到目前为止得到的:
import numpy as np
import multiprocessing
from functools import partial
def get_generator(length: int):
for i in range(length):
yield [i, i + 1]
def some_func(x, other_stuff):
y = np.sum(x)
return y
def task(other_stuff, x: np.ndarray):
val = some_func(x, other_stuff)
if val > task.some_dict['min']:
task.l.append(val)
task.some_dict['min'] = val
return
def task_init(l, some_dict):
task.l = l
task.some_dict = some_dict
task.some_dict['min'] = np.NINF
n = 20
generator = get_generator(n)
other_stuff = np.nan
func = partial(task, other_stuff)
l = multiprocessing.Manager().list()
some_dict = multiprocessing.Manager().dict()
p = multiprocessing.Pool(None, task_init, [l, some_dict])
p.imap(func, generator, chunksize=10000)
p.close()
p.join()
这与我想做的有点相似。但我真的很关心性能,在实际代码中,最佳值的 comparison/saving 会更复杂,所以我认为共享内存方法会非常慢。
我的问题归结为:
如果我有例如8 个核心,我怎么可能有 8 个最佳结果列表,每一个都返回一个核心,以便核心完全独立且相当快地工作?
非常感谢!
这些是我付诸行动的意见。我希望您的实际任务是更复杂的计算,否则几乎不值得使用多处理。
import numpy as np
import multiprocessing
from functools import partial
from heapq import *
def get_generator(length: int):
for i in range(length):
yield [i, i + 1]
def some_func(x, other_stuff):
y = np.sum(x)
return y
def task(other_stuff, x: np.ndarray):
val = some_func(x, other_stuff)
return val
def main():
n = 20
generator = get_generator(n)
other_stuff = np.nan
func = partial(task, other_stuff)
cpu_count = multiprocessing.cpu_count() - 1 # leave a processor for the main process
chunk_size = n // cpu_count
HEAPSIZE = 8
with multiprocessing.Pool(cpu_count) as pool:
heap = []
for val in pool.imap_unordered(func, generator, chunksize=chunk_size):
if len(heap) < HEAPSIZE:
heappush(heap, val)
elif val > heap[0]:
heappushpop(heap, val)
# sort
values = sorted(heap, reverse=True)
print(values)
if __name__ == '__main__':
main()
打印:
[39, 37, 35, 33, 31, 29, 27, 25]
更新
我发现最好通过以下实验为池分配数量等于 mp.cpu_count() - 1
的进程,以便为主进程留出一个空闲处理器来处理工作人员返回的结果。我还尝试了 chunksize
参数:
import multiprocessing as mp
import timeit
def worker_process(i):
s = 0
for n in range(10000):
s += i * i # square the argument
s /= 10000
return s
def main():
cpu_count = mp.cpu_count() - 1 # leave a processor for the main process
N = 10000
chunk_size = N // cpu_count # 100 may be good enough
results = []
with mp.Pool(cpu_count) as pool:
for result in pool.imap_unordered(worker_process, range(N), chunksize=chunk_size):
results.append(result)
#print(results[0:10])
if __name__ == '__main__':
print(timeit.timeit(stmt='main()', number=10, globals=globals()) / 10)
在我的桌面上(运行 其他进程,例如流媒体音乐),上面的代码将 mp.cpu_count() - 1
分配给 cpu_count
效果更好(2.4 秒对 2.5 秒)。以下是其他时间(四舍五入到小数点后一位):
chunksize = 1428 -> 2.4 seconds (N // (mp.cpu_count() - 1)
chunksize = 1000 -> 2.7 seconds
chunksize = 100 -> 2.4 seconds
chunksize = 10 -> 2.4 seconds
chunksize = 1 -> 2.6 seconds
chunksize 值为 1000 的结果有点异常。我建议尝试不同的值,否则 N // (mp.cpu_count() - 1)
。这是假设您可以计算 N
,可迭代项中的项目数。当你有一个生成器作为可迭代对象时,在一般情况下,你必须先将它转换为一个列表,才能获得它的长度。在这个特定的基准测试中,即使 chunksize
值为 1 也没有那么糟糕 。但这是我从改变 worker_process
必须做的工作量中学到的:
您的工作进程为完成其任务而必须完成的工作(即 CPU)越多,它对 chunksize
参数的敏感度就越低。 如果它 returns 在使用很少的 CPU 之后,那么传输下一个块的开销变得很大,并且你希望将块传输的数量保持在一个较小的值(即你想要一个大的 chunksize
值)。但是如果过程很长运行,传输下一个块的开销就不会那么有影响。
在下面的代码中,工作进程的 CPU 要求很简单:
import multiprocessing as mp
import timeit
def worker_process(i):
return i ** 2
def main():
cpu_count = mp.cpu_count() - 1
N = 100000
chunk_size = N // cpu_count
results = []
with mp.Pool(cpu_count) as pool:
for result in pool.imap_unordered(worker_process, range(N), chunksize=chunk_size):
results.append(result)
print(results[0:10])
if __name__ == '__main__':
print(timeit.timeit(stmt='main()', number=10, globals=globals()) / 10)
时间:
chunksize = 1428 -> .19 seconds
chunksize = 100 -> .39 seconds
chunksize = 1 -> 11.06 seconds
在下面的代码中,工作进程的 CPU 要求更为重要:
import multiprocessing as mp
import timeit
def worker_process(i):
s = 0
for _ in range(1000000):
s += i * i
return s // 1000000
def main():
cpu_count = mp.cpu_count() - 1
N = 1000
chunk_size = N // cpu_count
results = []
with mp.Pool(cpu_count) as pool:
for result in pool.imap_unordered(worker_process, range(N), chunksize=chunk_size):
results.append(result)
print(results[0:10])
if __name__ == '__main__':
print(timeit.timeit(stmt='main()', number=3, globals=globals()) / 3)
时间:
chunksize = 142 -> 22.6 seconds (N // (mp.cpu_count() - 1)
chunksize = 10 -> 23.5 seconds
chunksize = 1 -> 23.2 seconds
更新 2
根据 ,当使用 chunksize=None
调用方法 map
、starmap
或 map_async
时,有一种特定的算法用于计算 chunksize
,我在下面的代码中使用了它。我不知道为什么方法 imap
和 imap_unordered
的默认值是 1 并且不使用相同的算法。也许是因为这不会像这些方法的描述所暗示的那样“懒惰”。在以下重复先前基准的代码中,我使用相同算法的重新定义来计算默认 chunksize
:
import multiprocessing as mp
import timeit
def worker_process(i):
s = 0
for _ in range(1000000):
s += i * i
return s // 1000000
def compute_chunksize(pool_size, iterable_size):
if iterable_size == 0:
return 0
chunksize, extra = divmod(iterable_size, pool_size * 4)
if extra:
chunksize += 1
return chunksize
def main():
cpu_count = mp.cpu_count() - 1
N = 1000
chunk_size = compute_chunksize(cpu_count, N)
print('chunk_size =', chunk_size)
results = []
with mp.Pool(cpu_count) as pool:
for result in pool.imap_unordered(worker_process, range(N), chunksize=chunk_size):
results.append(result)
print(results[0:10])
if __name__ == '__main__':
print(timeit.timeit(stmt='main()', number=3, globals=globals()) / 3)
时间安排:
chunksize 36 -> 22.2 seconds
我阅读了很多关于使用 multiprocessing
模块进行并行化的文章,但其中 none 完全回答了我的问题。
我有一个很长的生成器给我参数值,我想为每个生成器计算一些函数值。但是,我只想保存最好的 n
很多,因为我只对最好的感兴趣,保存所有结果会耗尽 RAM。
在我看来,有两种方法可以做到这一点:1)在保存最佳值的进程之间使用公共共享内存,或者 2)为每个 core/process 和以后手动保留最佳结果的单独列表将这些列表合并在一起。
我认为第二种方法会更好,但是我不确定如何实现它。 这是我到目前为止得到的:
import numpy as np
import multiprocessing
from functools import partial
def get_generator(length: int):
for i in range(length):
yield [i, i + 1]
def some_func(x, other_stuff):
y = np.sum(x)
return y
def task(other_stuff, x: np.ndarray):
val = some_func(x, other_stuff)
if val > task.some_dict['min']:
task.l.append(val)
task.some_dict['min'] = val
return
def task_init(l, some_dict):
task.l = l
task.some_dict = some_dict
task.some_dict['min'] = np.NINF
n = 20
generator = get_generator(n)
other_stuff = np.nan
func = partial(task, other_stuff)
l = multiprocessing.Manager().list()
some_dict = multiprocessing.Manager().dict()
p = multiprocessing.Pool(None, task_init, [l, some_dict])
p.imap(func, generator, chunksize=10000)
p.close()
p.join()
这与我想做的有点相似。但我真的很关心性能,在实际代码中,最佳值的 comparison/saving 会更复杂,所以我认为共享内存方法会非常慢。
我的问题归结为: 如果我有例如8 个核心,我怎么可能有 8 个最佳结果列表,每一个都返回一个核心,以便核心完全独立且相当快地工作?
非常感谢!
这些是我付诸行动的意见。我希望您的实际任务是更复杂的计算,否则几乎不值得使用多处理。
import numpy as np
import multiprocessing
from functools import partial
from heapq import *
def get_generator(length: int):
for i in range(length):
yield [i, i + 1]
def some_func(x, other_stuff):
y = np.sum(x)
return y
def task(other_stuff, x: np.ndarray):
val = some_func(x, other_stuff)
return val
def main():
n = 20
generator = get_generator(n)
other_stuff = np.nan
func = partial(task, other_stuff)
cpu_count = multiprocessing.cpu_count() - 1 # leave a processor for the main process
chunk_size = n // cpu_count
HEAPSIZE = 8
with multiprocessing.Pool(cpu_count) as pool:
heap = []
for val in pool.imap_unordered(func, generator, chunksize=chunk_size):
if len(heap) < HEAPSIZE:
heappush(heap, val)
elif val > heap[0]:
heappushpop(heap, val)
# sort
values = sorted(heap, reverse=True)
print(values)
if __name__ == '__main__':
main()
打印:
[39, 37, 35, 33, 31, 29, 27, 25]
更新
我发现最好通过以下实验为池分配数量等于 mp.cpu_count() - 1
的进程,以便为主进程留出一个空闲处理器来处理工作人员返回的结果。我还尝试了 chunksize
参数:
import multiprocessing as mp
import timeit
def worker_process(i):
s = 0
for n in range(10000):
s += i * i # square the argument
s /= 10000
return s
def main():
cpu_count = mp.cpu_count() - 1 # leave a processor for the main process
N = 10000
chunk_size = N // cpu_count # 100 may be good enough
results = []
with mp.Pool(cpu_count) as pool:
for result in pool.imap_unordered(worker_process, range(N), chunksize=chunk_size):
results.append(result)
#print(results[0:10])
if __name__ == '__main__':
print(timeit.timeit(stmt='main()', number=10, globals=globals()) / 10)
在我的桌面上(运行 其他进程,例如流媒体音乐),上面的代码将 mp.cpu_count() - 1
分配给 cpu_count
效果更好(2.4 秒对 2.5 秒)。以下是其他时间(四舍五入到小数点后一位):
chunksize = 1428 -> 2.4 seconds (N // (mp.cpu_count() - 1)
chunksize = 1000 -> 2.7 seconds
chunksize = 100 -> 2.4 seconds
chunksize = 10 -> 2.4 seconds
chunksize = 1 -> 2.6 seconds
chunksize 值为 1000 的结果有点异常。我建议尝试不同的值,否则 N // (mp.cpu_count() - 1)
。这是假设您可以计算 N
,可迭代项中的项目数。当你有一个生成器作为可迭代对象时,在一般情况下,你必须先将它转换为一个列表,才能获得它的长度。在这个特定的基准测试中,即使 chunksize
值为 1 也没有那么糟糕 。但这是我从改变 worker_process
必须做的工作量中学到的:
您的工作进程为完成其任务而必须完成的工作(即 CPU)越多,它对 chunksize
参数的敏感度就越低。 如果它 returns 在使用很少的 CPU 之后,那么传输下一个块的开销变得很大,并且你希望将块传输的数量保持在一个较小的值(即你想要一个大的 chunksize
值)。但是如果过程很长运行,传输下一个块的开销就不会那么有影响。
在下面的代码中,工作进程的 CPU 要求很简单:
import multiprocessing as mp
import timeit
def worker_process(i):
return i ** 2
def main():
cpu_count = mp.cpu_count() - 1
N = 100000
chunk_size = N // cpu_count
results = []
with mp.Pool(cpu_count) as pool:
for result in pool.imap_unordered(worker_process, range(N), chunksize=chunk_size):
results.append(result)
print(results[0:10])
if __name__ == '__main__':
print(timeit.timeit(stmt='main()', number=10, globals=globals()) / 10)
时间:
chunksize = 1428 -> .19 seconds
chunksize = 100 -> .39 seconds
chunksize = 1 -> 11.06 seconds
在下面的代码中,工作进程的 CPU 要求更为重要:
import multiprocessing as mp
import timeit
def worker_process(i):
s = 0
for _ in range(1000000):
s += i * i
return s // 1000000
def main():
cpu_count = mp.cpu_count() - 1
N = 1000
chunk_size = N // cpu_count
results = []
with mp.Pool(cpu_count) as pool:
for result in pool.imap_unordered(worker_process, range(N), chunksize=chunk_size):
results.append(result)
print(results[0:10])
if __name__ == '__main__':
print(timeit.timeit(stmt='main()', number=3, globals=globals()) / 3)
时间:
chunksize = 142 -> 22.6 seconds (N // (mp.cpu_count() - 1)
chunksize = 10 -> 23.5 seconds
chunksize = 1 -> 23.2 seconds
更新 2
根据 chunksize=None
调用方法 map
、starmap
或 map_async
时,有一种特定的算法用于计算 chunksize
,我在下面的代码中使用了它。我不知道为什么方法 imap
和 imap_unordered
的默认值是 1 并且不使用相同的算法。也许是因为这不会像这些方法的描述所暗示的那样“懒惰”。在以下重复先前基准的代码中,我使用相同算法的重新定义来计算默认 chunksize
:
import multiprocessing as mp
import timeit
def worker_process(i):
s = 0
for _ in range(1000000):
s += i * i
return s // 1000000
def compute_chunksize(pool_size, iterable_size):
if iterable_size == 0:
return 0
chunksize, extra = divmod(iterable_size, pool_size * 4)
if extra:
chunksize += 1
return chunksize
def main():
cpu_count = mp.cpu_count() - 1
N = 1000
chunk_size = compute_chunksize(cpu_count, N)
print('chunk_size =', chunk_size)
results = []
with mp.Pool(cpu_count) as pool:
for result in pool.imap_unordered(worker_process, range(N), chunksize=chunk_size):
results.append(result)
print(results[0:10])
if __name__ == '__main__':
print(timeit.timeit(stmt='main()', number=3, globals=globals()) / 3)
时间安排:
chunksize 36 -> 22.2 seconds