在 class 中使用 multiprocess.Pool.map

use multiprocess.Pool.map in a class

from multiprocessing import Pool

class Acc:
    def __init__(self):
        self.count = 0

    def multiprocess(self):
        pool = Pool(processes=4)
        result = pool.map(self.run, [1]*30)
        pool.close()
        pool.join()

    def run(self, i):
        self.count += i
        return self.count

a = Acc()
a.multiprocess()
print(a.count)

我想输出应该是 30,但它是 0。我不知道 multiprocess.Pool.map 是如何工作的以及它如何与 class 合作。请详细告诉我。

顺便说一下,如果我在里面打印 self.count 就像

    def run(self, i):
        print(self.count)
        self.count += i
        return self.count

给出

0
1
0
1
00

1
10

1
00

11

0
1
00

1001



11

0
10

10

1

比较迷惑,为什么0和1混在一起

如果必须使用多处理,我会遵循以下方法。 因为我们想 运行 我们的代码并行,所以我不希望在 map.
中传递实例方法 我会将 run 转换为函数而不是方法。这将与参数和 return 相同。

def run(i):
    return i

然后在 multiprocess 方法中,我将循环获取 pool.map 的 return 值,然后添加到 self.count

def multiprocess(self):
    pool = Pool(processes=4)
    for r_value in pool.map(run, [1]*30):
        self.count += r_value
    pool.close()
    pool.join()

输出为

30

Process finished with exit code 0

完整代码:

from multiprocessing import Pool

def run(i):
    return i

class Acc:
    def __init__(self):
        self.count = 0

    def multiprocess(self):
        pool = Pool(processes=4)
        for r_value in pool.map(run, [1]*30):
            self.count += r_value
        pool.close()
        pool.join()



if __name__ =='__main__':
    a = Acc()
    a.multiprocess()
    print(a.count)

首先让我们通过在打印语句中添加 flush=True 让打印输出更有序一些,这样每个打印输出都占据自己的行:

from multiprocessing import Pool

class Acc:
    def __init__(self):
        self.count = 0

    def multiprocess(self):
        pool = Pool(processes=4)
        result = pool.map(self.run, [1]*30)
        pool.close()
        pool.join()

    def run(self, i):
        print('i =', self.count, flush=True)
        self.count += i
        return self.count

if __name__ == '__main__':
    a = Acc()
    a.multiprocess()
    print('a.count =', a.count)

打印:

i = 0
i = 1
i = 0
i = 1
i = 0
i = 1
i = 0
i = 0
i = 1
i = 0
i = 1
i = 0
i = 1
i = 0
i = 0
i = 1
i = 0
i = 1
i = 1
i = 0
i = 1
i = 0
i = 0
i = 1
i = 0
i = 1
i = 1
i = 0
i = 1
i = 1
a.count = 0

分析

现在让我们来分析一下这是怎么回事。 a = Acc()的创建由主进程完成。正在执行的多处理池进程是一个不同的地址 space,因此当它们执行您的工作函数 self.run 时,对象 a 必须 serialized/de-serialized 到地址 space将执行 worker 函数的进程。在那个新地址 space self.count 遇到了初始值 0,它被打印出来,然后递增到 1 和 returned。同时,并行地,对象 a 再被 serialized/de-serialized 3 次,因此其他 3 个进程可以执行相同的处理,它们也将打印 0 和 return 值 1。但是由于所有这些递增都发生在地址 space 中的 a 的副本上,而不是主进程的地址 space,因此主进程中的原始 a 保持不变.所以随着map函数继续执行,a进一步从主进程复制到处理池中,它总是与self.count = 0.

那么问题就变成了为什么有时会打印 i = 1 而不是 i = 0

当您使用 iterable 执行 map 指定 30 个元素时,默认情况下,这 30 个任务根据 chunksize 您提供的参数。由于我们采用默认的 chunksize=Nonemap 函数根据 的长度计算默认的 chunksize 值可迭代 和池大小:

chunksize, remainder = divmod(len(iterable), 4 * pool_size)
if remainder:
    chunksize += 1

在此池大小为 4,因此 chunksize 将被计算为 2。这意味着多处理池中的每个进程一次接受任务队列中的两个任务,因此他们使用不同的 i 值(被忽略)两次处理 相同的对象

如果我们指定 chunksize 为 1,这样每个进程一次只处理一个对象,那么我们有:

from multiprocessing import Pool

class Acc:
    def __init__(self):
        self.count = 0

    def multiprocess(self):
        pool = Pool(processes=4)
        result = pool.map(self.run, [1]*30, chunksize=1)
        pool.close()
        pool.join()

    def run(self, i):
        print('i =', self.count, flush=True)
        self.count += i
        return self.count

if __name__ == '__main__':
    a = Acc()
    a.multiprocess()
    print('a.count =', a.count)

打印;

i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
a.count = 0

并且如果我们将 chunksize 指定为 30,以便单个进程针对单个对象处理所有任务:

from multiprocessing import Pool

class Acc:
    def __init__(self):
        self.count = 0

    def multiprocess(self):
        pool = Pool(processes=4)
        result = pool.map(self.run, [1]*30, chunksize=30)
        pool.close()
        pool.join()

    def run(self, i):
        print('i =', self.count, flush=True)
        self.count += i
        return self.count

if __name__ == '__main__':
    a = Acc()
    a.multiprocess()
    print('a.count =', a.count)

打印:

i = 0
i = 1
i = 2
i = 3
i = 4
i = 5
i = 6
i = 7
i = 8
i = 9
i = 10
i = 11
i = 12
i = 13
i = 14
i = 15
i = 16
i = 17
i = 18
i = 19
i = 20
i = 21
i = 22
i = 23
i = 24
i = 25
i = 26
i = 27
i = 28
i = 29
a.count = 0

在最后一个案例中,当然没有发生多处理,因为多处理池的单个进程处理了所有提交的任务。