Python 多处理:有效地只保存最好的运行

Python Multiprocessing: efficiently only save the best runs

我阅读了很多关于使用 multiprocessing 模块进行并行化的文章,但其中 none 完全回答了我的问题。

我有一个很长的生成器给我参数值,我想为每个生成器计算一些函数值。但是,我只想保存最好的 n 很多,因为我只对最好的感兴趣,保存所有结果会耗尽 RAM。 在我看来,有两种方法可以做到这一点:1)在保存最佳值的进程之间使用公共共享内存,或者 2)为每个 core/process 和以后手动保留最佳结果的单独列表将这些列表合并在一起。

我认为第二种方法会更好,但是我不确定如何实现它。 这是我到目前为止得到的:

import numpy as np
import multiprocessing
from functools import partial


def get_generator(length: int):
    for i in range(length):
        yield [i, i + 1]


def some_func(x, other_stuff):
    y = np.sum(x)
    return y


def task(other_stuff, x: np.ndarray):
    val = some_func(x, other_stuff)
    
    if val > task.some_dict['min']:
        task.l.append(val)
        task.some_dict['min'] = val
    return


def task_init(l, some_dict):
    task.l = l
    task.some_dict = some_dict
    task.some_dict['min'] = np.NINF

n = 20
generator = get_generator(n)
other_stuff = np.nan

func = partial(task, other_stuff)

l = multiprocessing.Manager().list()
some_dict = multiprocessing.Manager().dict()

p = multiprocessing.Pool(None, task_init, [l, some_dict])

p.imap(func, generator, chunksize=10000)

p.close()
p.join()

这与我想做的有点相似。但我真的很关心性能,在实际代码中,最佳值的 comparison/saving 会更复杂,所以我认为共享内存方法会非常慢。

我的问题归结为: 如果我有例如8 个核心,我怎么可能有 8 个最佳结果列表,每一个都返回一个核心,以便核心完全独立且相当快地工作?

非常感谢!

这些是我付诸行动的意见。我希望您的实际任务是更复杂的计算,否则几乎不值得使用多处理。

import numpy as np
import multiprocessing
from functools import partial
from heapq import *

def get_generator(length: int):
    for i in range(length):
        yield [i, i + 1]


def some_func(x, other_stuff):
    y = np.sum(x)
    return y


def task(other_stuff, x: np.ndarray):
    val = some_func(x, other_stuff)
    return val



def main():
    n = 20
    generator = get_generator(n)
    other_stuff = np.nan

    func = partial(task, other_stuff)

    cpu_count = multiprocessing.cpu_count() - 1 # leave a processor for the main process
    chunk_size = n // cpu_count
    HEAPSIZE = 8
    with multiprocessing.Pool(cpu_count) as pool:
        heap = []
        for val in pool.imap_unordered(func, generator, chunksize=chunk_size):
            if len(heap) < HEAPSIZE:
                heappush(heap, val)
            elif val > heap[0]:
                heappushpop(heap, val)
        # sort
        values = sorted(heap, reverse=True)
        print(values)


if __name__ == '__main__':
    main()

打印:

[39, 37, 35, 33, 31, 29, 27, 25]

更新

我发现最好通过以下实验为池分配数量等于 mp.cpu_count() - 1 的进程,以便为主进程留出一个空闲处理器来处理工作人员返回的结果。我还尝试了 chunksize 参数:

import multiprocessing as mp
import timeit

def worker_process(i):
    s = 0
    for n in range(10000):
        s += i * i # square the argument
    s /= 10000
    return s

def main():
    cpu_count = mp.cpu_count() - 1 # leave a processor for the main process
    N = 10000
    chunk_size = N // cpu_count # 100 may be good enough
    results = []
    with mp.Pool(cpu_count) as pool:
        for result in pool.imap_unordered(worker_process, range(N), chunksize=chunk_size):
            results.append(result)
        #print(results[0:10])

if __name__ == '__main__':
    print(timeit.timeit(stmt='main()', number=10, globals=globals()) / 10)

在我的桌面上(运行 其他进程,例如流媒体音乐),上面的代码将 mp.cpu_count() - 1 分配给 cpu_count 效果更好(2.4 秒对 2.5 秒)。以下是其他时间(四舍五入到小数点后一位):

chunksize = 1428 -> 2.4 seconds (N // (mp.cpu_count() - 1)
chunksize = 1000 -> 2.7 seconds
chunksize = 100 -> 2.4 seconds
chunksize = 10 -> 2.4 seconds
chunksize = 1 -> 2.6 seconds

chunksize 值为 1000 的结果有点异常。我建议尝试不同的值,否则 N // (mp.cpu_count() - 1)。这是假设您可以计算 N,可迭代项中的项目数。当你有一个生成器作为可迭代对象时,在一般情况下,你必须先将它转换为一个列表,才能获得它的长度。在这个特定的基准测试中,即使 chunksize 值为 1 也没有那么糟糕 。但这是我从改变 worker_process 必须做的工作量中学到的:

您的工作进程为完成其任务而必须完成的工作(即 CPU)越多,它对 chunksize 参数的敏感度就越低。 如果它 returns 在使用很少的 CPU 之后,那么传输下一个块的开销变得很大,并且你希望将块传输的数量保持在一个较小的值(即你想要一个大的 chunksize 值)。但是如果过程很长运行,传输下一个块的开销就不会那么有影响。

在下面的代码中,工作进程的 CPU 要求很简单:

import multiprocessing as mp
import timeit

def worker_process(i):
    return i ** 2

def main():
    cpu_count = mp.cpu_count() - 1
    N = 100000
    chunk_size = N // cpu_count
    results = []
    with mp.Pool(cpu_count) as pool:
        for result in pool.imap_unordered(worker_process, range(N), chunksize=chunk_size):
            results.append(result)
        print(results[0:10])

if __name__ == '__main__':
    print(timeit.timeit(stmt='main()', number=10, globals=globals()) / 10)

时间:

chunksize = 1428 -> .19 seconds
chunksize = 100 -> .39 seconds
chunksize = 1 -> 11.06 seconds

在下面的代码中,工作进程的 CPU 要求更为重要:

import multiprocessing as mp
import timeit

def worker_process(i):
    s = 0
    for _ in range(1000000):
        s += i * i
    return s // 1000000


def main():
    cpu_count = mp.cpu_count() - 1
    N = 1000
    chunk_size = N // cpu_count
    results = []
    with mp.Pool(cpu_count) as pool:
        for result in pool.imap_unordered(worker_process, range(N), chunksize=chunk_size):
            results.append(result)
        print(results[0:10])

if __name__ == '__main__':
    print(timeit.timeit(stmt='main()', number=3, globals=globals()) / 3)

时间:

chunksize = 142 -> 22.6 seconds (N // (mp.cpu_count() - 1)
chunksize = 10 -> 23.5 seconds
chunksize = 1 -> 23.2 seconds

更新 2

根据 ,当使用 chunksize=None 调用方法 mapstarmapmap_async 时,有一种特定的算法用于计算 chunksize,我在下面的代码中使用了它。我不知道为什么方法 imapimap_unordered 的默认值是 1 并且不使用相同的算法。也许是因为这不会像这些方法的描述所暗示的那样“懒惰”。在以下重复先前基准的代码中,我使用相同算法的重新定义来计算默认 chunksize:

import multiprocessing as mp
import timeit

def worker_process(i):
    s = 0
    for _ in range(1000000):
        s += i * i
    return s // 1000000


def compute_chunksize(pool_size, iterable_size):
    if iterable_size == 0:
        return 0
    chunksize, extra = divmod(iterable_size, pool_size * 4)
    if extra:
        chunksize += 1
    return chunksize


def main():
    cpu_count = mp.cpu_count() - 1
    N = 1000
    chunk_size = compute_chunksize(cpu_count, N)
    print('chunk_size =', chunk_size)
    results = []
    with mp.Pool(cpu_count) as pool:
        for result in pool.imap_unordered(worker_process, range(N), chunksize=chunk_size):
            results.append(result)
        print(results[0:10])

if __name__ == '__main__':
    print(timeit.timeit(stmt='main()', number=3, globals=globals()) / 3)

时间安排:

chunksize 36 -> 22.2 seconds