Python multiprocessing:如何知道使用Pool还是Process?
Python multiprocessing: How to know to use Pool or Process?
所以我有一个正在编写的算法,函数 multiprocess
应该在与 cpu 一样多的进程上并行调用另一个函数 CreateMatrixMp()
。我以前从未做过多处理,也不能确定下面哪一种方法更有效。在函数 CreateMatrixMp()
的上下文中使用的单词 "efficient" 可能需要被调用,成千上万的 times.I 已阅读 python multiprocessing
上的所有文档模块,并得出这两种可能性:
首先是使用 Pool
class:
def MatrixHelper(self, args):
return self.CreateMatrix(*args)
def Multiprocess(self, sigmaI, sigmaX):
cpus = mp.cpu_count()
print('Number of cpu\'s to process WM: %d' % cpus)
poolCount = cpus*2
args = [(sigmaI, sigmaX, i) for i in range(self.numPixels)]
pool = mp.Pool(processes = poolCount, maxtasksperchild= 2)
tempData = pool.map(self.MatrixHelper, args)
pool.close()
pool.join()
接下来是使用 Process
class:
def Multiprocess(self, sigmaI, sigmaX):
cpus = mp.cpu_count()
print('Number of cpu\'s to process WM: %d' % cpus)
processes = [mp.Process(target = self.CreateMatrixMp, args = (sigmaI, sigmaX, i,)) for i in range(self.numPixels)]
for p in processes:
p.start()
for p in processes:
p.join()
Pool
似乎是更好的选择。我读过它会减少开销。而Process
并没有考虑机器上的cpus个数。唯一的问题是,以这种方式使用 Pool
会给我一个又一个错误,每当我修复一个错误时,它下面就会有一个新错误。 Process
似乎更容易实现,据我所知,这可能是更好的选择。你的经历告诉你什么?
如果应该用Pool
,那我选map()
对吗?最好维持秩序。我有 tempData = pool.map(...)
因为 map
函数应该 return 每个过程的结果列表。我不确定 Process
如何处理其 returned 数据。
我认为 Pool
class 通常更方便,但这取决于您希望结果是有序的还是无序的。
假设您想创建 4 个随机字符串(例如,可以是随机用户 ID 生成器):
import multiprocessing as mp
import random
import string
# Define an output queue
output = mp.Queue()
# define a example function
def rand_string(length, output):
""" Generates a random string of numbers, lower- and uppercase chars. """
rand_str = ''.join(random.choice(
string.ascii_lowercase
+ string.ascii_uppercase
+ string.digits)
for i in range(length))
output.put(rand_str)
# Setup a list of processes that we want to run
processes = [mp.Process(target=rand_string, args=(5, output)) for x in range(4)]
# Run processes
for p in processes:
p.start()
# Exit the completed processes
for p in processes:
p.join()
# Get process results from the output queue
results = [output.get() for p in processes]
print(results)
# Output
# ['yzQfA', 'PQpqM', 'SHZYV', 'PSNkD']
在这里,顺序可能无关紧要。我不确定是否有更好的方法,但是如果我想按照调用函数的顺序跟踪结果,我通常 return 元组将 ID 作为第一项,例如,
# define a example function
def rand_string(length, pos, output):
""" Generates a random string of numbers, lower- and uppercase chars. """
rand_str = ''.join(random.choice(
string.ascii_lowercase
+ string.ascii_uppercase
+ string.digits)
for i in range(length))
output.put((pos, rand_str))
# Setup a list of processes that we want to run
processes = [mp.Process(target=rand_string, args=(5, x, output)) for x in range(4)]
print(processes)
# Output
# [(1, '5lUya'), (3, 'QQvLr'), (0, 'KAQo6'), (2, 'nj6Q0')]
这让我对结果进行排序:
results.sort()
results = [r[1] for r in results]
print(results)
# Output:
# ['KAQo6', '5lUya', 'nj6Q0', 'QQvLr']
游泳池class
现在回答您的问题:这与 Pool
class 有何不同?
您通常更喜欢 Pool.map
而不是 return 有序的结果列表,而无需经历创建元组并按 ID 对它们进行排序的过程。因此,我会说它通常更有效率。
def cube(x):
return x**3
pool = mp.Pool(processes=4)
results = pool.map(cube, range(1,7))
print(results)
# output:
# [1, 8, 27, 64, 125, 216]
等价的还有一个"apply"方法:
pool = mp.Pool(processes=4)
results = [pool.apply(cube, args=(x,)) for x in range(1,7)]
print(results)
# output:
# [1, 8, 27, 64, 125, 216]
Pool.apply
和 Pool.map
都会锁定主程序,直到进程完成。
现在,您还有 Pool.apply_async
和 Pool.map_async
,其中 return 过程完成后立即得到结果,这在本质上类似于 Process
class 以上。优点可能是它们为您提供了方便的 apply
和 map
功能,您从 Python 的内置 apply
和 map
您可以使用 pypeln 轻松做到这一点:
import pypeln as pl
stage = pl.process.map(
CreateMatrixMp,
range(self.numPixels),
workers=poolCount,
maxsize=2,
)
# iterate over it in the main process
for x in stage:
# code
# or convert it to a list
data = list(stage)
所以我有一个正在编写的算法,函数 multiprocess
应该在与 cpu 一样多的进程上并行调用另一个函数 CreateMatrixMp()
。我以前从未做过多处理,也不能确定下面哪一种方法更有效。在函数 CreateMatrixMp()
的上下文中使用的单词 "efficient" 可能需要被调用,成千上万的 times.I 已阅读 python multiprocessing
上的所有文档模块,并得出这两种可能性:
首先是使用 Pool
class:
def MatrixHelper(self, args):
return self.CreateMatrix(*args)
def Multiprocess(self, sigmaI, sigmaX):
cpus = mp.cpu_count()
print('Number of cpu\'s to process WM: %d' % cpus)
poolCount = cpus*2
args = [(sigmaI, sigmaX, i) for i in range(self.numPixels)]
pool = mp.Pool(processes = poolCount, maxtasksperchild= 2)
tempData = pool.map(self.MatrixHelper, args)
pool.close()
pool.join()
接下来是使用 Process
class:
def Multiprocess(self, sigmaI, sigmaX):
cpus = mp.cpu_count()
print('Number of cpu\'s to process WM: %d' % cpus)
processes = [mp.Process(target = self.CreateMatrixMp, args = (sigmaI, sigmaX, i,)) for i in range(self.numPixels)]
for p in processes:
p.start()
for p in processes:
p.join()
Pool
似乎是更好的选择。我读过它会减少开销。而Process
并没有考虑机器上的cpus个数。唯一的问题是,以这种方式使用 Pool
会给我一个又一个错误,每当我修复一个错误时,它下面就会有一个新错误。 Process
似乎更容易实现,据我所知,这可能是更好的选择。你的经历告诉你什么?
如果应该用Pool
,那我选map()
对吗?最好维持秩序。我有 tempData = pool.map(...)
因为 map
函数应该 return 每个过程的结果列表。我不确定 Process
如何处理其 returned 数据。
我认为 Pool
class 通常更方便,但这取决于您希望结果是有序的还是无序的。
假设您想创建 4 个随机字符串(例如,可以是随机用户 ID 生成器):
import multiprocessing as mp
import random
import string
# Define an output queue
output = mp.Queue()
# define a example function
def rand_string(length, output):
""" Generates a random string of numbers, lower- and uppercase chars. """
rand_str = ''.join(random.choice(
string.ascii_lowercase
+ string.ascii_uppercase
+ string.digits)
for i in range(length))
output.put(rand_str)
# Setup a list of processes that we want to run
processes = [mp.Process(target=rand_string, args=(5, output)) for x in range(4)]
# Run processes
for p in processes:
p.start()
# Exit the completed processes
for p in processes:
p.join()
# Get process results from the output queue
results = [output.get() for p in processes]
print(results)
# Output
# ['yzQfA', 'PQpqM', 'SHZYV', 'PSNkD']
在这里,顺序可能无关紧要。我不确定是否有更好的方法,但是如果我想按照调用函数的顺序跟踪结果,我通常 return 元组将 ID 作为第一项,例如,
# define a example function
def rand_string(length, pos, output):
""" Generates a random string of numbers, lower- and uppercase chars. """
rand_str = ''.join(random.choice(
string.ascii_lowercase
+ string.ascii_uppercase
+ string.digits)
for i in range(length))
output.put((pos, rand_str))
# Setup a list of processes that we want to run
processes = [mp.Process(target=rand_string, args=(5, x, output)) for x in range(4)]
print(processes)
# Output
# [(1, '5lUya'), (3, 'QQvLr'), (0, 'KAQo6'), (2, 'nj6Q0')]
这让我对结果进行排序:
results.sort()
results = [r[1] for r in results]
print(results)
# Output:
# ['KAQo6', '5lUya', 'nj6Q0', 'QQvLr']
游泳池class
现在回答您的问题:这与 Pool
class 有何不同?
您通常更喜欢 Pool.map
而不是 return 有序的结果列表,而无需经历创建元组并按 ID 对它们进行排序的过程。因此,我会说它通常更有效率。
def cube(x):
return x**3
pool = mp.Pool(processes=4)
results = pool.map(cube, range(1,7))
print(results)
# output:
# [1, 8, 27, 64, 125, 216]
等价的还有一个"apply"方法:
pool = mp.Pool(processes=4)
results = [pool.apply(cube, args=(x,)) for x in range(1,7)]
print(results)
# output:
# [1, 8, 27, 64, 125, 216]
Pool.apply
和 Pool.map
都会锁定主程序,直到进程完成。
现在,您还有 Pool.apply_async
和 Pool.map_async
,其中 return 过程完成后立即得到结果,这在本质上类似于 Process
class 以上。优点可能是它们为您提供了方便的 apply
和 map
功能,您从 Python 的内置 apply
和 map
您可以使用 pypeln 轻松做到这一点:
import pypeln as pl
stage = pl.process.map(
CreateMatrixMp,
range(self.numPixels),
workers=poolCount,
maxsize=2,
)
# iterate over it in the main process
for x in stage:
# code
# or convert it to a list
data = list(stage)