如何在 Python 的多处理池中使用值
How to use Values in a multiprocessing pool with Python
我希望能够使用多处理库中的值模块来跟踪数据。据我所知,在 Python 中进行多处理时,每个进程都有自己的副本,因此我无法编辑全局变量。我希望能够使用 Values 来解决这个问题。有谁知道如何将 Values 数据传递到合并函数中?
from multiprocessing import Pool, Value
import itertools
arr = [2,6,8,7,4,2,5,6,2,4,7,8,5,2,7,4,2,5,6,2,4,7,8,5,2,9,3,2,0,1,5,7,2,8,9,3,2,]
def hello(g, data):
data.value += 1
if __name__ == '__main__':
data = Value('i', 0)
func = partial(hello, data)
p = Pool(processes=1)
p.map(hello,itertools.izip(arr,itertools.repeat(data)))
print data.value
这是我遇到的运行时错误:
RuntimeError: Synchronized objects should only be shared between processes through inheritance
有谁知道我做错了什么?
我不知道为什么,但使用 Pool
似乎存在一些问题,如果手动创建子流程则不会。例如。以下作品:
from multiprocessing import Process, Value
arr = [1,2,3,4,5,6,7,8,9]
def hello(data, g):
with data.get_lock():
data.value += 1
print id(data), g, data.value
if __name__ == '__main__':
data = Value('i')
print id(data)
processes = []
for n in arr:
p = Process(target=hello, args=(data, n))
processes.append(p)
p.start()
for p in processes:
p.join()
print "sub process tasks completed"
print data.value
但是,如果您使用 Pool
进行基本相同的思考,则会出现错误 "RuntimeError: Synchronized objects should only be shared between processes through inheritance"。我以前在使用池时看到过这个错误,但从未完全弄清楚它。
使用似乎与 Pool
一起使用的 Value
的替代方法是使用管理器为您提供 'shared' 列表:
from multiprocessing import Pool, Manager
from functools import partial
arr = [1,2,3,4,5,6,7,8,9]
def hello(data, g):
data[0] += 1
if __name__ == '__main__':
m = Manager()
data = m.list([0])
hello_data = partial(hello, data)
p = Pool(processes=5)
p.map(hello_data, arr)
print data[0]
几乎不需要将Values
与Pool.map()
一起使用。
map
的中心思想是将函数应用于列表或其他迭代器中的每个项目,收集列表中的 return 个值。
Pool.map
背后的想法基本相同,但分布在多个进程中。在每个工作进程中,映射函数都会使用迭代器中的项目进行调用。
来自工作进程中调用的函数的 return 值 被传输回父进程并收集到一个列表中,该列表最终被 returned.
或者,您可以使用 Pool.imap_unordered
,它会在结果可用时立即开始 returning 结果,而不是等到一切都完成。因此,您可以计算 returned 结果的数量,并使用它来更新进度条。
我希望能够使用多处理库中的值模块来跟踪数据。据我所知,在 Python 中进行多处理时,每个进程都有自己的副本,因此我无法编辑全局变量。我希望能够使用 Values 来解决这个问题。有谁知道如何将 Values 数据传递到合并函数中?
from multiprocessing import Pool, Value
import itertools
arr = [2,6,8,7,4,2,5,6,2,4,7,8,5,2,7,4,2,5,6,2,4,7,8,5,2,9,3,2,0,1,5,7,2,8,9,3,2,]
def hello(g, data):
data.value += 1
if __name__ == '__main__':
data = Value('i', 0)
func = partial(hello, data)
p = Pool(processes=1)
p.map(hello,itertools.izip(arr,itertools.repeat(data)))
print data.value
这是我遇到的运行时错误:
RuntimeError: Synchronized objects should only be shared between processes through inheritance
有谁知道我做错了什么?
我不知道为什么,但使用 Pool
似乎存在一些问题,如果手动创建子流程则不会。例如。以下作品:
from multiprocessing import Process, Value
arr = [1,2,3,4,5,6,7,8,9]
def hello(data, g):
with data.get_lock():
data.value += 1
print id(data), g, data.value
if __name__ == '__main__':
data = Value('i')
print id(data)
processes = []
for n in arr:
p = Process(target=hello, args=(data, n))
processes.append(p)
p.start()
for p in processes:
p.join()
print "sub process tasks completed"
print data.value
但是,如果您使用 Pool
进行基本相同的思考,则会出现错误 "RuntimeError: Synchronized objects should only be shared between processes through inheritance"。我以前在使用池时看到过这个错误,但从未完全弄清楚它。
使用似乎与 Pool
一起使用的 Value
的替代方法是使用管理器为您提供 'shared' 列表:
from multiprocessing import Pool, Manager
from functools import partial
arr = [1,2,3,4,5,6,7,8,9]
def hello(data, g):
data[0] += 1
if __name__ == '__main__':
m = Manager()
data = m.list([0])
hello_data = partial(hello, data)
p = Pool(processes=5)
p.map(hello_data, arr)
print data[0]
几乎不需要将Values
与Pool.map()
一起使用。
map
的中心思想是将函数应用于列表或其他迭代器中的每个项目,收集列表中的 return 个值。
Pool.map
背后的想法基本相同,但分布在多个进程中。在每个工作进程中,映射函数都会使用迭代器中的项目进行调用。
来自工作进程中调用的函数的 return 值 被传输回父进程并收集到一个列表中,该列表最终被 returned.
或者,您可以使用 Pool.imap_unordered
,它会在结果可用时立即开始 returning 结果,而不是等到一切都完成。因此,您可以计算 returned 结果的数量,并使用它来更新进度条。