在 class 中使用 multiprocess.Pool.map
use multiprocess.Pool.map in a class
from multiprocessing import Pool
class Acc:
def __init__(self):
self.count = 0
def multiprocess(self):
pool = Pool(processes=4)
result = pool.map(self.run, [1]*30)
pool.close()
pool.join()
def run(self, i):
self.count += i
return self.count
a = Acc()
a.multiprocess()
print(a.count)
我想输出应该是 30,但它是 0。我不知道 multiprocess.Pool.map
是如何工作的以及它如何与 class 合作。请详细告诉我。
顺便说一下,如果我在里面打印 self.count 就像
def run(self, i):
print(self.count)
self.count += i
return self.count
给出
0
1
0
1
00
1
10
1
00
11
0
1
00
1001
11
0
10
10
1
比较迷惑,为什么0和1混在一起
如果必须使用多处理,我会遵循以下方法。
因为我们想 运行 我们的代码并行,所以我不希望在 map
.
中传递实例方法
我会将 run
转换为函数而不是方法。这将与参数和 return 相同。
def run(i):
return i
然后在 multiprocess
方法中,我将循环获取 pool.map
的 return 值,然后添加到 self.count
def multiprocess(self):
pool = Pool(processes=4)
for r_value in pool.map(run, [1]*30):
self.count += r_value
pool.close()
pool.join()
输出为
30
Process finished with exit code 0
完整代码:
from multiprocessing import Pool
def run(i):
return i
class Acc:
def __init__(self):
self.count = 0
def multiprocess(self):
pool = Pool(processes=4)
for r_value in pool.map(run, [1]*30):
self.count += r_value
pool.close()
pool.join()
if __name__ =='__main__':
a = Acc()
a.multiprocess()
print(a.count)
首先让我们通过在打印语句中添加 flush=True
让打印输出更有序一些,这样每个打印输出都占据自己的行:
from multiprocessing import Pool
class Acc:
def __init__(self):
self.count = 0
def multiprocess(self):
pool = Pool(processes=4)
result = pool.map(self.run, [1]*30)
pool.close()
pool.join()
def run(self, i):
print('i =', self.count, flush=True)
self.count += i
return self.count
if __name__ == '__main__':
a = Acc()
a.multiprocess()
print('a.count =', a.count)
打印:
i = 0
i = 1
i = 0
i = 1
i = 0
i = 1
i = 0
i = 0
i = 1
i = 0
i = 1
i = 0
i = 1
i = 0
i = 0
i = 1
i = 0
i = 1
i = 1
i = 0
i = 1
i = 0
i = 0
i = 1
i = 0
i = 1
i = 1
i = 0
i = 1
i = 1
a.count = 0
分析
现在让我们来分析一下这是怎么回事。 a = Acc()
的创建由主进程完成。正在执行的多处理池进程是一个不同的地址 space,因此当它们执行您的工作函数 self.run
时,对象 a
必须 serialized/de-serialized 到地址 space将执行 worker 函数的进程。在那个新地址 space self.count
遇到了初始值 0,它被打印出来,然后递增到 1 和 returned。同时,并行地,对象 a
再被 serialized/de-serialized 3 次,因此其他 3 个进程可以执行相同的处理,它们也将打印 0 和 return 值 1。但是由于所有这些递增都发生在地址 space 中的 a
的副本上,而不是主进程的地址 space,因此主进程中的原始 a
保持不变.所以随着map
函数继续执行,a
进一步从主进程复制到处理池中,它总是与self.count = 0
.
那么问题就变成了为什么有时会打印 i = 1
而不是 i = 0
?
当您使用 iterable 执行 map
指定 30 个元素时,默认情况下,这 30 个任务根据 chunksize 您提供的参数。由于我们采用默认的 chunksize=None,map
函数根据 的长度计算默认的 chunksize
值可迭代 和池大小:
chunksize, remainder = divmod(len(iterable), 4 * pool_size)
if remainder:
chunksize += 1
在此池大小为 4,因此 chunksize
将被计算为 2。这意味着多处理池中的每个进程一次接受任务队列中的两个任务,因此他们使用不同的 i
值(被忽略)两次处理 相同的对象 。
如果我们指定 chunksize 为 1,这样每个进程一次只处理一个对象,那么我们有:
from multiprocessing import Pool
class Acc:
def __init__(self):
self.count = 0
def multiprocess(self):
pool = Pool(processes=4)
result = pool.map(self.run, [1]*30, chunksize=1)
pool.close()
pool.join()
def run(self, i):
print('i =', self.count, flush=True)
self.count += i
return self.count
if __name__ == '__main__':
a = Acc()
a.multiprocess()
print('a.count =', a.count)
打印;
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
a.count = 0
并且如果我们将 chunksize 指定为 30,以便单个进程针对单个对象处理所有任务:
from multiprocessing import Pool
class Acc:
def __init__(self):
self.count = 0
def multiprocess(self):
pool = Pool(processes=4)
result = pool.map(self.run, [1]*30, chunksize=30)
pool.close()
pool.join()
def run(self, i):
print('i =', self.count, flush=True)
self.count += i
return self.count
if __name__ == '__main__':
a = Acc()
a.multiprocess()
print('a.count =', a.count)
打印:
i = 0
i = 1
i = 2
i = 3
i = 4
i = 5
i = 6
i = 7
i = 8
i = 9
i = 10
i = 11
i = 12
i = 13
i = 14
i = 15
i = 16
i = 17
i = 18
i = 19
i = 20
i = 21
i = 22
i = 23
i = 24
i = 25
i = 26
i = 27
i = 28
i = 29
a.count = 0
在最后一个案例中,当然没有发生多处理,因为多处理池的单个进程处理了所有提交的任务。
from multiprocessing import Pool
class Acc:
def __init__(self):
self.count = 0
def multiprocess(self):
pool = Pool(processes=4)
result = pool.map(self.run, [1]*30)
pool.close()
pool.join()
def run(self, i):
self.count += i
return self.count
a = Acc()
a.multiprocess()
print(a.count)
我想输出应该是 30,但它是 0。我不知道 multiprocess.Pool.map
是如何工作的以及它如何与 class 合作。请详细告诉我。
顺便说一下,如果我在里面打印 self.count 就像
def run(self, i):
print(self.count)
self.count += i
return self.count
给出
0
1
0
1
00
1
10
1
00
11
0
1
00
1001
11
0
10
10
1
比较迷惑,为什么0和1混在一起
如果必须使用多处理,我会遵循以下方法。
因为我们想 运行 我们的代码并行,所以我不希望在 map
.
中传递实例方法
我会将 run
转换为函数而不是方法。这将与参数和 return 相同。
def run(i):
return i
然后在 multiprocess
方法中,我将循环获取 pool.map
的 return 值,然后添加到 self.count
def multiprocess(self):
pool = Pool(processes=4)
for r_value in pool.map(run, [1]*30):
self.count += r_value
pool.close()
pool.join()
输出为
30
Process finished with exit code 0
完整代码:
from multiprocessing import Pool
def run(i):
return i
class Acc:
def __init__(self):
self.count = 0
def multiprocess(self):
pool = Pool(processes=4)
for r_value in pool.map(run, [1]*30):
self.count += r_value
pool.close()
pool.join()
if __name__ =='__main__':
a = Acc()
a.multiprocess()
print(a.count)
首先让我们通过在打印语句中添加 flush=True
让打印输出更有序一些,这样每个打印输出都占据自己的行:
from multiprocessing import Pool
class Acc:
def __init__(self):
self.count = 0
def multiprocess(self):
pool = Pool(processes=4)
result = pool.map(self.run, [1]*30)
pool.close()
pool.join()
def run(self, i):
print('i =', self.count, flush=True)
self.count += i
return self.count
if __name__ == '__main__':
a = Acc()
a.multiprocess()
print('a.count =', a.count)
打印:
i = 0
i = 1
i = 0
i = 1
i = 0
i = 1
i = 0
i = 0
i = 1
i = 0
i = 1
i = 0
i = 1
i = 0
i = 0
i = 1
i = 0
i = 1
i = 1
i = 0
i = 1
i = 0
i = 0
i = 1
i = 0
i = 1
i = 1
i = 0
i = 1
i = 1
a.count = 0
分析
现在让我们来分析一下这是怎么回事。 a = Acc()
的创建由主进程完成。正在执行的多处理池进程是一个不同的地址 space,因此当它们执行您的工作函数 self.run
时,对象 a
必须 serialized/de-serialized 到地址 space将执行 worker 函数的进程。在那个新地址 space self.count
遇到了初始值 0,它被打印出来,然后递增到 1 和 returned。同时,并行地,对象 a
再被 serialized/de-serialized 3 次,因此其他 3 个进程可以执行相同的处理,它们也将打印 0 和 return 值 1。但是由于所有这些递增都发生在地址 space 中的 a
的副本上,而不是主进程的地址 space,因此主进程中的原始 a
保持不变.所以随着map
函数继续执行,a
进一步从主进程复制到处理池中,它总是与self.count = 0
.
那么问题就变成了为什么有时会打印 i = 1
而不是 i = 0
?
当您使用 iterable 执行 map
指定 30 个元素时,默认情况下,这 30 个任务根据 chunksize 您提供的参数。由于我们采用默认的 chunksize=None,map
函数根据 的长度计算默认的 chunksize
值可迭代 和池大小:
chunksize, remainder = divmod(len(iterable), 4 * pool_size)
if remainder:
chunksize += 1
在此池大小为 4,因此 chunksize
将被计算为 2。这意味着多处理池中的每个进程一次接受任务队列中的两个任务,因此他们使用不同的 i
值(被忽略)两次处理 相同的对象 。
如果我们指定 chunksize 为 1,这样每个进程一次只处理一个对象,那么我们有:
from multiprocessing import Pool
class Acc:
def __init__(self):
self.count = 0
def multiprocess(self):
pool = Pool(processes=4)
result = pool.map(self.run, [1]*30, chunksize=1)
pool.close()
pool.join()
def run(self, i):
print('i =', self.count, flush=True)
self.count += i
return self.count
if __name__ == '__main__':
a = Acc()
a.multiprocess()
print('a.count =', a.count)
打印;
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
a.count = 0
并且如果我们将 chunksize 指定为 30,以便单个进程针对单个对象处理所有任务:
from multiprocessing import Pool
class Acc:
def __init__(self):
self.count = 0
def multiprocess(self):
pool = Pool(processes=4)
result = pool.map(self.run, [1]*30, chunksize=30)
pool.close()
pool.join()
def run(self, i):
print('i =', self.count, flush=True)
self.count += i
return self.count
if __name__ == '__main__':
a = Acc()
a.multiprocess()
print('a.count =', a.count)
打印:
i = 0
i = 1
i = 2
i = 3
i = 4
i = 5
i = 6
i = 7
i = 8
i = 9
i = 10
i = 11
i = 12
i = 13
i = 14
i = 15
i = 16
i = 17
i = 18
i = 19
i = 20
i = 21
i = 22
i = 23
i = 24
i = 25
i = 26
i = 27
i = 28
i = 29
a.count = 0
在最后一个案例中,当然没有发生多处理,因为多处理池的单个进程处理了所有提交的任务。