并行化 Python 的 reduce 命令
Parallelize Python's reduce command
在 Python 我是 运行 形式的命令
reduce(func, bigArray[1:], bigArray[0])
并且我想添加并行处理以加快速度。
我知道我可以通过拆分数组、运行 在单独的部分上处理并合并结果来手动执行此操作。
但是,考虑到 运行 并行减少的普遍性,我想看看是否有本地方法或库可以自动执行此操作。
我是 运行 一台 6 核机器。
对于遇到这个问题的任何人,我最终写了一个帮助程序来完成它
def parallelReduce(l, numCPUs, connection=None):
if numCPUs == 1 or len(l) <= 100:
returnVal= reduce(reduceFunc, l[1:], l[0])
if connection != None:
connection.send(returnVal)
return returnVal
parent1, child1 = multiprocessing.Pipe()
parent2, child2 = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=parallelReduce, args=(l[:len(l) // 2], numCPUs // 2, child1, ) )
p2 = multiprocessing.Process(target=parallelReduce, args=(l[len(l) // 2:], numCPUs // 2 + numCPUs%2, child2, ) )
p1.start()
p2.start()
leftReturn, rightReturn = parent1.recv(), parent2.recv()
p1.join()
p2.join()
returnVal = reduceFunc(leftReturn, rightReturn)
if connection != None:
connection.send(returnVal)
return returnVal
请注意,您可以通过 multiprocessing.cpu_count()
获取 CPU 数量
使用这个函数比串行版本显着提高了性能。
如果您能够合并 map 和 reduce(或者想要连接结果而不是更一般的 reduce),您可以使用 mr4p:
https://github.com/lapets/mr4mp
class 中的 _reduce 函数的代码似乎通过 multiprocessing.pool 实现并行处理以汇集通常的 reduce 进程,大致遵循以下进程:
reduce(<Function used to reduce>, pool.map(partial(reduce, <function used to reduce>), <List of results to reduce>))
我还没有尝试过,但语法似乎是:
mr4mp.pool().mapreduce(<Function to be mapped>,<Function used to reduce>, <List of entities to apply function on>)
在 Python 我是 运行 形式的命令
reduce(func, bigArray[1:], bigArray[0])
并且我想添加并行处理以加快速度。
我知道我可以通过拆分数组、运行 在单独的部分上处理并合并结果来手动执行此操作。
但是,考虑到 运行 并行减少的普遍性,我想看看是否有本地方法或库可以自动执行此操作。
我是 运行 一台 6 核机器。
对于遇到这个问题的任何人,我最终写了一个帮助程序来完成它
def parallelReduce(l, numCPUs, connection=None):
if numCPUs == 1 or len(l) <= 100:
returnVal= reduce(reduceFunc, l[1:], l[0])
if connection != None:
connection.send(returnVal)
return returnVal
parent1, child1 = multiprocessing.Pipe()
parent2, child2 = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=parallelReduce, args=(l[:len(l) // 2], numCPUs // 2, child1, ) )
p2 = multiprocessing.Process(target=parallelReduce, args=(l[len(l) // 2:], numCPUs // 2 + numCPUs%2, child2, ) )
p1.start()
p2.start()
leftReturn, rightReturn = parent1.recv(), parent2.recv()
p1.join()
p2.join()
returnVal = reduceFunc(leftReturn, rightReturn)
if connection != None:
connection.send(returnVal)
return returnVal
请注意,您可以通过 multiprocessing.cpu_count()
使用这个函数比串行版本显着提高了性能。
如果您能够合并 map 和 reduce(或者想要连接结果而不是更一般的 reduce),您可以使用 mr4p:
https://github.com/lapets/mr4mp
class 中的 _reduce 函数的代码似乎通过 multiprocessing.pool 实现并行处理以汇集通常的 reduce 进程,大致遵循以下进程:
reduce(<Function used to reduce>, pool.map(partial(reduce, <function used to reduce>), <List of results to reduce>))
我还没有尝试过,但语法似乎是:
mr4mp.pool().mapreduce(<Function to be mapped>,<Function used to reduce>, <List of entities to apply function on>)