在 python 中使用多处理工具的方法比没有它的方法效果更差

Method with multiprocessing tool in python works worse than methoud without that

我正在尝试使用多处理来加快处理大量文件的速度,而不是一个一个地读取它们。在那之前我做了一个测试来学习。下面是我的代码:

from multiprocessing.pool import Pool
from time import sleep, time

def print_cube(num):
    aa1 = num * num
    aa2 = num * num * num
    return aa1, aa2


def main1():
    start = time()
    x = []
    y = []
    p = Pool(16)
    for j in range(1, 5):
        results = p.apply_async(print_cube, args = (j, ))
        x.append(results.get()[0])
        y.append(results.get()[1])
    end = time()
    return end - start, x, y 
    
def main2():
    start = time()
    x = []
    y = []
    for j in range(1, 5):
        results = print_cube(j)
        x.append(results[0])
        y.append(results[1])
    end = time()
    return end - start, x, y

if __name__ == "__main__":
    print("Method1{0}time : {1}{2}x : {3}{4}y : {5}".format('\n' ,main1()[0], '\n', main1()[1], '\n', main1()[2]))
    print("Method2{0}time : {1:.6f}{2}x : {3}{4}y : {5}".format('\n' ,main2()[0], '\n', main2()[1], '\n', main2()[2]))

结果是:

Method1
time : 0.1549079418182373
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
Method2
time : 0.000000
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]

方法 1 使用多处理,消耗更多 CPU,但比方法 2 花费更多时间。

即使循环次数j达到5000或更多,方法2的效果也比方法1好。谁能告诉我我的代码有什么问题吗?

使用多处理会产生您没有的开销,例如 (1) 创建进程,(2) 将参数传递给辅助函数,在不同的进程中是 运行 以及 (3)将结果传递回您的主要流程。因此,worker 函数必须足够 CPU-intensive 以便您通过 运行 它并行获得的收益抵消我刚才提到的额外开销。您的工作函数 print_cube 不符合该标准,因为它不够 CPU-intensive.

但你甚至不是 运行 你的并行工作函数。

您正在通过调用方法 multiprocessing.pool.Pool.apply_async 循环提交任务,其中 returns 是 multiprocessing.pool.AsyncResult 的一个实例,但在您再次调用 apply_async 提交下一个之前您在 AsyncResult 上调用方法 get 的任务,因此会阻塞,直到第一个任务完成并且 returns 在您提交第二个任务之前得到结果!!!您必须使用 apply_async 提交 所有 任务并 保存 返回的 AsyncResult 个实例,然后才调用 get 在这些情况下。只有这样你才能实现并行。即便如此,您的工作函数 print_cube 使用太少 CPU 来克服多处理使用的额外开销比串行处理更高效。

在下面的代码中,我 (1) 更正了多处理代码以执行并行并创建一个大小为 5 的池(没有理由创建一个进程数超过您要提交的任务数的池或纯粹 CPU-bound 任务的 CPU 处理器数量;这只是您无缘无故造成的额外开销)和 (2) 将 print_cube 修改为非常 CPU-intensive 展示多处理的优势(尽管是人为的方式):

from multiprocessing.pool import Pool
from time import sleep, time

def print_cube(num):
    # emulate a CPU-intensive calculation:
    for _ in range(10_000_000):
        aa1 = num * num
        aa2 = num * num * num
    return aa1, aa2


def main1():
    start = time()
    x = []
    y = []
    p = Pool(5)
    # Submit all the tasks and save the AsyncResult instances:
    results = [p.apply_async(print_cube, args = (j, )) for j in range(1, 5)]
    # Now wait for the return values:
    for result in results:
        # Unpack the tuple:
        x_value, y_value = result.get()
        x.append(x_value)
        y.append(y_value)
    end = time()
    return end - start, x, y

def main2():
    start = time()
    x = []
    y = []
    for j in range(1, 5):
        results = print_cube(j)
        x.append(results[0])
        y.append(results[1])
    end = time()
    return end - start, x, y

if __name__ == "__main__":
    print("Method1{0}time : {1}{2}x : {3}{4}y : {5}".format('\n' ,main1()[0], '\n', main1()[1], '\n', main1()[2]))
    print("Method2{0}time : {1:.6f}{2}x : {3}{4}y : {5}".format('\n' ,main2()[0], '\n', main2()[1], '\n', main2()[2]))

打印:

Method1
time : 1.109999656677246
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
Method2
time : 2.827015
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]

重要提示

除非您有固态驱动器,否则您可能会发现尝试并行读取多个文件可能会 counter-productive 因为头部来回移动。这也可能是多线程的工作better-suited。

@Booboo 首先,非常感谢您详细而精彩的解释。它帮助我更好地理解 python 的多处理工具,你的代码也是一个很好的例子。而下次尝试应用multiprocessing时,我想我会首先考虑任务是否满足你说的multiprocessing的特性。抱歉回复晚了,我 运行 做了一些实验。

其次,我 运行 你在我的电脑上给出的代码,它显示了与你的相似的结果,其中方法 1 确实比方法 2 花费更少的时间和更高的 CPU 消耗。

Method1
time : 1.0751237869262695
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
Method2
time : 3.642306
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]

第三,关于你写的note,数据文件存储在固态硬盘中,我测试了方法1处理大约50 * 100 MB csv文件的时间和CPU消耗(多处理)、Method2(无)和 Method3(多线程)。 Method2 确实消耗了很高百分比的 CPU,50%,但没有像 Method1 那样达到最大值。结果如下:

time : 12.527468204498291
time : 59.400668144226074
time : 35.45922660827637

第四,下面是模拟CPU-intensive计算的例子:

import threading
from multiprocessing.pool import Pool
from queue import Queue
from time import time

def print_cube(num):
    # emulate a CPU-intensive calculation:
    for _ in range(10_000_000_0):
        aa1 = num * num
        aa2 = num * num * num
    return aa1, aa2

def print_cube_queue(num, q):
    # emulate a CPU-intensive calculation:
    for _ in range(10_000_000_0):
        aa1 = num * num
        aa2 = num * num * num
    q.put((aa1, aa2))

def main1():
    start = time()
    x = []
    y = []
    p = Pool(8)
    # Submit all the tasks and save the AsyncResult instances:
    results = [p.apply_async(print_cube, args = (j, )) for j in range(1, 5)]
    # Now wait for the return values:
    for result in results:
        # Unpack the tuple:
        x_value, y_value = result.get()
        x.append(x_value)
        y.append(y_value)
    end = time()
    return end - start, x, y

def main2():
    start = time()
    x = []
    y = []
    for j in range(1, 5):
        results = print_cube(j)
        x.append(results[0])
        y.append(results[1])
    end = time()
    return end - start, x, y

def main3():
    start = time()
    q = Queue()
    x = []
    y = []
    threads = []
    for j in range(1, 5):
        t = threading.Thread(target=print_cube_queue, args = (j, q))
        t.start()
        threads.append(t)
    for thread in threads:
        thread.join()
    results = []
    for thread in threads:
        x_value, y_value = q.get()
        x.append(x_value)
        y.append(y_value) #q.get()按顺序从q中拿出一个值
    end = time()
    return end - start, x, y


if __name__ == "__main__":
    print("Method1{0}time : {1}{2}x : {3}{4}y : {5}".format('\n' ,main1()[0], '\n', main1()[1], '\n', main1()[2]))
    print("Method2{0}time : {1:.6f}{2}x : {3}{4}y : {5}".format('\n' ,main2()[0], '\n', main2()[1], '\n', main2()[2]))
    print("Method3{0}time : {1:.6f}{2}x : {3}{4}y : {5}".format('\n' ,main3()[0], '\n', main3()[1], '\n', main3()[2]))

结果是:

Method1
time : 9.838010549545288
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
Method2
time : 35.850124
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
Method3
time : 37.191602
x : [4, 16, 9, 1]
y : [8, 1, 64, 27]

查了下,不知道是GIL的原因还是其他的。