如何更改并行进程数?
How to change number of parallel processes?
我有一个 python 脚本,其中 运行 是一个并行方法。
parsers = {
'parser1': parser1.process,
'parser2': parser2.process
}
def process((key, value)):
parsers[key](value)
pool = Pool(4)
pool.map(process_items, items)
process_items
是我的方法,items
是一个元组列表,每个元组有两个元素。 items
列表有大约 10 万个项目。
process_items
将根据给定的参数调用方法。我的问题可能是列表的 70% 我可以 运行 具有高并行性,但其他 30% 只能 运行 使用 1/2 线程否则会导致我无法控制的故障。
所以在我的代码中我有大约 10 个不同的解析器进程。对于说 1-8 我想 运行 与 Pool(4) 但 9-10 Pool(2).
优化它的最佳方法是什么?
您可以在 multiprocessing.Pool
的构造函数中指定并行线程数:
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(5) # 5 is the number of parallel threads
print pool.map(f, [1, 2, 3])
我认为你最好的选择是在这里使用两个池:
from multiprocessing import Pool
# import parsers here
parsers = {
'parser1': parser1.process,
'parser2': parser2.process,
'parser3': parser3.process,
'parser4': parser4.process,
'parser5': parser5.process,
'parser6': parser6.process,
'parser7': parser7.process,
}
# Sets that define which items can use high parallelism,
# and which must use low
high_par = {"parser1", "parser3", "parser4", "parser6", "parser7"}
low_par = {"parser2", "parser5"}
def process_items(key, value):
parsers[key](value)
def run_pool(func, items, num_items, check_set):
pool = Pool(num_items)
out = pool.map(func, (item for item in items if item[0] in check_set))
pool.close()
pool.join()
return out
if __name__ == "__main__":
items = [('parser2', x), ...] # Your list of tuples
# Process with high parallelism
high_results = run_pool(process_items, items, 4, high_par)
# Process with low parallelism
low_results = run_pool(process_items, items, 2, low_par)
通过巧妙地使用同步原语,尝试在一个 Pool
中做到这一点是可能的,但我认为它最终看起来不会比这更干净。它也可能最终 运行 效率降低,因为有时您的池需要等待工作完成,因此它可以处理低并行度项目,即使队列中它后面有高并行度项目。
如果您需要从每个 process_items
调用中获取与它们落在原始可迭代对象中的顺序相同的结果,这会变得有点复杂,这意味着每个 Pool
的结果需要合并,但根据您的示例,我认为这不是必需的。如果是,请告诉我,我会相应地调整我的答案。
我有一个 python 脚本,其中 运行 是一个并行方法。
parsers = {
'parser1': parser1.process,
'parser2': parser2.process
}
def process((key, value)):
parsers[key](value)
pool = Pool(4)
pool.map(process_items, items)
process_items
是我的方法,items
是一个元组列表,每个元组有两个元素。 items
列表有大约 10 万个项目。
process_items
将根据给定的参数调用方法。我的问题可能是列表的 70% 我可以 运行 具有高并行性,但其他 30% 只能 运行 使用 1/2 线程否则会导致我无法控制的故障。
所以在我的代码中我有大约 10 个不同的解析器进程。对于说 1-8 我想 运行 与 Pool(4) 但 9-10 Pool(2).
优化它的最佳方法是什么?
您可以在 multiprocessing.Pool
的构造函数中指定并行线程数:
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(5) # 5 is the number of parallel threads
print pool.map(f, [1, 2, 3])
我认为你最好的选择是在这里使用两个池:
from multiprocessing import Pool
# import parsers here
parsers = {
'parser1': parser1.process,
'parser2': parser2.process,
'parser3': parser3.process,
'parser4': parser4.process,
'parser5': parser5.process,
'parser6': parser6.process,
'parser7': parser7.process,
}
# Sets that define which items can use high parallelism,
# and which must use low
high_par = {"parser1", "parser3", "parser4", "parser6", "parser7"}
low_par = {"parser2", "parser5"}
def process_items(key, value):
parsers[key](value)
def run_pool(func, items, num_items, check_set):
pool = Pool(num_items)
out = pool.map(func, (item for item in items if item[0] in check_set))
pool.close()
pool.join()
return out
if __name__ == "__main__":
items = [('parser2', x), ...] # Your list of tuples
# Process with high parallelism
high_results = run_pool(process_items, items, 4, high_par)
# Process with low parallelism
low_results = run_pool(process_items, items, 2, low_par)
通过巧妙地使用同步原语,尝试在一个 Pool
中做到这一点是可能的,但我认为它最终看起来不会比这更干净。它也可能最终 运行 效率降低,因为有时您的池需要等待工作完成,因此它可以处理低并行度项目,即使队列中它后面有高并行度项目。
如果您需要从每个 process_items
调用中获取与它们落在原始可迭代对象中的顺序相同的结果,这会变得有点复杂,这意味着每个 Pool
的结果需要合并,但根据您的示例,我认为这不是必需的。如果是,请告诉我,我会相应地调整我的答案。