使用 python 的多处理并行化具有多个列表参数的函数

Parallelizing a function with multiple lists arguments with python's multiprocessing

我希望这不是重复的,但我找不到针对这个特定问题的完全令人满意的答案。

给定一个具有多个列表参数和一个可迭代的函数,例如这里有两个列表

def function(list1, list2, iterable):
    i1 = 2*iterable
    i2 = 2*iterable+1
    list1[i1] *= 2
    list2[i2] += 2
    return(list1, list2)

每个列表都在不同的条目上访问,因此操作是分开的并且可以并行化。 python 的多重处理的最佳方式是什么?

一种简单的并行化方法是使用映射函数:

import multiprocessing as mp
from functools import partial

list1, list2 = [1,1,1,1,1], [2,2,2,2,2]
func = partial(function, list1, list2)
pool = mp.Pool()
pool.map(func, [0,1])

问题是,如果有人这样做,就会为每个进程生成一份列表副本(如果我理解 map-function 正确的话),然后在这些副本的不同位置并行工作。最后(在触及两个可迭代对象 [0,1] 之后)pool.map 的结果是

[([3, 1, 1, 1, 1], [2, 4, 2, 2, 2]), ([1, 1, 3, 1, 1], [2, 2, 2, 4, 2])]

但我想要

[([3, 1, 3, 1, 1], [2, 4, 2, 4, 2])].

如何实现?是否应该将列表拆分为之前的可迭代对象,运行 并行执行特定操作,然后再次合并它们?

提前致谢,如果我搞混了,请原谅,我刚开始使用多处理库。

编辑: 对列表不同部分的操作可以在没有同步的情况下并行化,对整个列表的操作不能并行化(没有同步)。因此,解决我的特定问题的方法是将列表和函数拆分为操作和列表的一部分。之后合并列表的各个部分以返回整个列表。

您不能在进程之间共享内存(从技术上讲,如果您不更改 objects/affect 引用计数,您可以在基于 fork 的系统上共享内存,这在实际使用中很少发生)- 您的选择是要么使用 shared 结构(其中大部分在 multiprocessing.Manager() 下可用)将为您完成 synchronization/updates,或者仅传递处理所需的数据然后将结果拼接起来。

你的例子很简单,两种方法都可以工作而不会受到严重的惩罚,所以我会和经理一起去:

import multiprocessing
import functools

def your_function(list1, list2, iterable):
    i1 = 2 * iterable
    i2 = 2 * iterable + 1
    list1[i1] *= 2
    list2[i2] += 2

if __name__ == "__main__":  # a multi-processing guard for cross-platform use
    manager = multiprocessing.Manager()
    l1 = manager.list([1, 1, 1, 1, 1])
    l2 = manager.list([2, 2, 2, 2, 2])
    func = functools.partial(your_function, l1, l2)
    pool = multiprocessing.Pool()
    pool.map(func, [0, 1])
    print(l1, l2)  # [2, 1, 2, 1, 1] [2, 4, 2, 4, 2]

或者如果你的用例更倾向于处理后拼接数据:

import multiprocessing
import functools

def your_function(list1, list2, iterable):
    i1 = 2 * iterable
    i2 = 2 * iterable + 1
    return (i1, list1[i1] * 2), (i2, list2[i2] + 2)  # return the changed index and value

if __name__ == "__main__":  # a multi-processing guard for cross-platform use
    l1 = [1, 1, 1, 1, 1]
    l2 = [2, 2, 2, 2, 2]
    func = functools.partial(your_function, l1, l2)
    pool = multiprocessing.Pool()
    results = pool.map(func, [0, 1])
    for r1, r2 in results:  # stitch the results back into l1 and l2
        l1[r1[0]] = r1[1]
        l2[r2[0]] = r2[1]
    print(l1, l2)  # [2, 1, 2, 1, 1] [2, 4, 2, 4, 2]

话虽这么说,但输出并不是您所拥有的 listed/expected,而是根据您的函数应该发生的结果。

此外,如果您的情况如此简单,您可能希望完全避开多处理 - 增加的开销多处理(加上管理器同步)是不值得的,除非 your_function() 确实做了一些 CPU-高强度任务。

这里是问题的解决方案。我不知道这是否是最好的方法,但它有效:

import multiprocessing as mp
from functools import partial

def operation1(lst, pos)
    return(pos, lst[pos] * 2)

def operation2(lst, pos)
    return(pos, lst[pos] + 2)

if __name__ == "__main__":
    list1, list2 = [1,1,1,1,1], [2,2,2,2,2]
    iterable = [0,1]
    index1_list = [2*i for i in iterable]
    index2_list = [2*i+1 for i in iterable]

    func1 = partial(operation1, list1)
    func2 = partial(operation2, list2)

    with mp.Pool() as pool:
        result1 = pool.map(func1, index1_list)
        result2 = pool.map(func2, index2_list)

    for result in result1:
        list1[result[0]] = result[1]

    for result in result2:
        list2[result[0]] = result[1]

    print(list1, list2)