如何在 Python 的多处理池中使用值

How to use Values in a multiprocessing pool with Python

我希望能够使用多处理库中的值模块来跟踪数据。据我所知,在 Python 中进行多处理时,每个进程都有自己的副本,因此我无法编辑全局变量。我希望能够使用 Values 来解决这个问题。有谁知道如何将 Values 数据传递到合并函数中?

from multiprocessing import Pool, Value
import itertools

arr = [2,6,8,7,4,2,5,6,2,4,7,8,5,2,7,4,2,5,6,2,4,7,8,5,2,9,3,2,0,1,5,7,2,8,9,3,2,]

def hello(g, data):
    data.value += 1

if __name__ == '__main__':
    data = Value('i', 0)
    func = partial(hello, data)
    p = Pool(processes=1)
    p.map(hello,itertools.izip(arr,itertools.repeat(data)))

    print data.value

这是我遇到的运行时错误:

RuntimeError: Synchronized objects should only be shared between processes through inheritance

有谁知道我做错了什么?

我不知道为什么,但使用 Pool 似乎存在一些问题,如果手动创建子流程则不会。例如。以下作品:

from multiprocessing import Process, Value

arr = [1,2,3,4,5,6,7,8,9]


def hello(data, g):
    with data.get_lock():
        data.value += 1
    print id(data), g, data.value

if __name__ == '__main__':
    data = Value('i')
    print id(data)

    processes =  []
    for n in arr:
        p = Process(target=hello, args=(data, n))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print "sub process tasks completed"
    print data.value

但是,如果您使用 Pool 进行基本相同的思考,则会出现错误 "RuntimeError: Synchronized objects should only be shared between processes through inheritance"。我以前在使用池时看到过这个错误,但从未完全弄清楚它。

使用似乎与 Pool 一起使用的 Value 的替代方法是使用管理器为您提供 'shared' 列表:

from multiprocessing import Pool, Manager
from functools import partial


arr = [1,2,3,4,5,6,7,8,9]


def hello(data, g):
    data[0] += 1


if __name__ == '__main__':
    m = Manager()
    data = m.list([0])
    hello_data = partial(hello, data)
    p = Pool(processes=5)
    p.map(hello_data, arr)

    print data[0]

几乎不需要将ValuesPool.map()一起使用。

map 的中心思想是将函数应用于列表或其他迭代器中的每个项目,收集列表中的 return 个值。

Pool.map 背后的想法基本相同,但分布在多个进程中。在每个工作进程中,映射函数都会使用迭代器中的项目进行调用。 来自工作进程中调用的函数的 return 值 被传输回父进程并收集到一个列表中,该列表最终被 returned.


或者,您可以使用 Pool.imap_unordered,它会在结果可用时立即开始 returning 结果,而不是等到一切都完成。因此,您可以计算 returned 结果的数量,并使用它来更新进度条。