multiprocessing.Pool 跨子进程共享内存中只读的大型列表
multiprocessing.Pool sharing large lists of lists read-only in memory across child process
我正在为这个问题而苦恼。
我有一大堆列表,我想用并行代码访问这些列表以执行 CPU 密集型操作。为了做到这一点,我正在尝试使用 multiprocessing.Pool
,问题是我还需要在我的子进程中查看这个庞大的列表列表。
由于 'list of lists' 不规则(例如:[[1, 2], [1, 2, 3]]
)我无法将它们存储为 mp.Array
,并且如前所述,我没有使用 mp.Process
所以我没有想出在这个任务上使用 mp.Manager
的方法。保留这个列表列表对我来说很重要,因为我正在应用一个函数,该函数使用 from operator import itemgetter
.
基于索引进行查询
这是我要实现的虚构示例:
import multiprocessing as mp
from operator import itemgetter
import numpy as np
def foo(indexes):
# here I must guarantee read acess for big_list_of_lists on every child process somehow
# as this code would work with only with one child process using global variables but would fail
# with larger data.
store_tuples = itemgetter(*indexes)(big_list_of_lists)
return np.mean([item for sublista in store_tuples for item in sublista])
def main():
# big_list_of_lists is the varible that I want to share across my child process
big_list_of_lists = [[1, 3], [3, 1, 3], [1, 2], [2, 0]]
ctx = mp.get_context('spawn')
# big_list_of_lists elements are also passed as args
pool = mp.Pool(ctx.Semaphore(mp.cpu_count()).get_value())
res=list(pool.map(foo, big_list_of_lists))
pool.close()
pool.join()
return res
if __name__ is '__main__':
print(main())
# desired output is equivalente to:
# a = []
# for i in big_list_of_lists:
# store_tuples = itemgetter(*i)(big_list_of_lists)
# a.append(np.mean([item for sublista in store_tuples for item in sublista]))
# 'a' would be equal to [1.8, 1.5714285714285714, 2.0, 1.75]
其他细节:解决方案最好使用 python 3.6 来实现,并且必须在 windows
上工作
非常感谢!
使用 mp.Manager
和 mp.Manager.list
的 mp.Manager.list
似乎对我来说效果很好。我相信这不会将列表复制到每个进程。
重要的一行是:
big_list_of_lists_proxy = manager.list([manager.list(sublist) for sublist in big_list_of_lists])
根据您的用例,您可能想改用:
big_list_of_lists_proxy = manager.list(big_list_of_lists)
每个子列表是否应该是一个代理取决于每个子列表是否很大,以及它是否被完整读取。如果子列表很大,那么将列表对象传输到需要它的每个进程的成本很高(O(n)
复杂性),因此应该使用来自管理器的代理列表,但是如果每个元素都将被需要反正用代理也没啥好处
import multiprocessing as mp
from operator import itemgetter
import numpy as np
from functools import partial
def foo(indexes, big_list_of_lists):
# here I must guarantee read acess for big_list_of_lists on every child process somehow
# as this code would work with only with one child process using global variables but would fail
# with larger data.
store_tuples = itemgetter(*indexes)(big_list_of_lists)
return np.mean([item for sublista in store_tuples for item in sublista])
def main():
# big_list_of_lists is the varible that I want to share across my child process
big_list_of_lists = [[1, 3], [3, 1, 3], [1, 2], [2, 0]]
ctx = mp.get_context('spawn')
with ctx.Manager() as manager:
big_list_of_lists_proxy = manager.list([manager.list(sublist) for sublist in big_list_of_lists])
# big_list_of_lists elements are also passed as args
pool = ctx.Pool(ctx.Semaphore(mp.cpu_count()).get_value())
res = list(pool.map(partial(foo, big_list_of_lists=big_list_of_lists_proxy), big_list_of_lists))
pool.close()
pool.join()
return res
if __name__ == '__main__':
print(main())
# desired output is equivalente to:
# a = []
# for i in big_list_of_lists:
# store_tuples = itemgetter(*i)(big_list_of_lists)
# a.append(np.mean([item for sublista in store_tuples for item in sublista]))
# 'a' would be equal to [1.8, 1.5714285714285714, 2.0, 1.75]
我正在为这个问题而苦恼。
我有一大堆列表,我想用并行代码访问这些列表以执行 CPU 密集型操作。为了做到这一点,我正在尝试使用 multiprocessing.Pool
,问题是我还需要在我的子进程中查看这个庞大的列表列表。
由于 'list of lists' 不规则(例如:[[1, 2], [1, 2, 3]]
)我无法将它们存储为 mp.Array
,并且如前所述,我没有使用 mp.Process
所以我没有想出在这个任务上使用 mp.Manager
的方法。保留这个列表列表对我来说很重要,因为我正在应用一个函数,该函数使用 from operator import itemgetter
.
这是我要实现的虚构示例:
import multiprocessing as mp
from operator import itemgetter
import numpy as np
def foo(indexes):
# here I must guarantee read acess for big_list_of_lists on every child process somehow
# as this code would work with only with one child process using global variables but would fail
# with larger data.
store_tuples = itemgetter(*indexes)(big_list_of_lists)
return np.mean([item for sublista in store_tuples for item in sublista])
def main():
# big_list_of_lists is the varible that I want to share across my child process
big_list_of_lists = [[1, 3], [3, 1, 3], [1, 2], [2, 0]]
ctx = mp.get_context('spawn')
# big_list_of_lists elements are also passed as args
pool = mp.Pool(ctx.Semaphore(mp.cpu_count()).get_value())
res=list(pool.map(foo, big_list_of_lists))
pool.close()
pool.join()
return res
if __name__ is '__main__':
print(main())
# desired output is equivalente to:
# a = []
# for i in big_list_of_lists:
# store_tuples = itemgetter(*i)(big_list_of_lists)
# a.append(np.mean([item for sublista in store_tuples for item in sublista]))
# 'a' would be equal to [1.8, 1.5714285714285714, 2.0, 1.75]
其他细节:解决方案最好使用 python 3.6 来实现,并且必须在 windows
上工作非常感谢!
使用 mp.Manager
和 mp.Manager.list
的 mp.Manager.list
似乎对我来说效果很好。我相信这不会将列表复制到每个进程。
重要的一行是:
big_list_of_lists_proxy = manager.list([manager.list(sublist) for sublist in big_list_of_lists])
根据您的用例,您可能想改用:
big_list_of_lists_proxy = manager.list(big_list_of_lists)
每个子列表是否应该是一个代理取决于每个子列表是否很大,以及它是否被完整读取。如果子列表很大,那么将列表对象传输到需要它的每个进程的成本很高(O(n)
复杂性),因此应该使用来自管理器的代理列表,但是如果每个元素都将被需要反正用代理也没啥好处
import multiprocessing as mp
from operator import itemgetter
import numpy as np
from functools import partial
def foo(indexes, big_list_of_lists):
# here I must guarantee read acess for big_list_of_lists on every child process somehow
# as this code would work with only with one child process using global variables but would fail
# with larger data.
store_tuples = itemgetter(*indexes)(big_list_of_lists)
return np.mean([item for sublista in store_tuples for item in sublista])
def main():
# big_list_of_lists is the varible that I want to share across my child process
big_list_of_lists = [[1, 3], [3, 1, 3], [1, 2], [2, 0]]
ctx = mp.get_context('spawn')
with ctx.Manager() as manager:
big_list_of_lists_proxy = manager.list([manager.list(sublist) for sublist in big_list_of_lists])
# big_list_of_lists elements are also passed as args
pool = ctx.Pool(ctx.Semaphore(mp.cpu_count()).get_value())
res = list(pool.map(partial(foo, big_list_of_lists=big_list_of_lists_proxy), big_list_of_lists))
pool.close()
pool.join()
return res
if __name__ == '__main__':
print(main())
# desired output is equivalente to:
# a = []
# for i in big_list_of_lists:
# store_tuples = itemgetter(*i)(big_list_of_lists)
# a.append(np.mean([item for sublista in store_tuples for item in sublista]))
# 'a' would be equal to [1.8, 1.5714285714285714, 2.0, 1.75]