在 python 中使用多处理工具的方法比没有它的方法效果更差
Method with multiprocessing tool in python works worse than methoud without that
我正在尝试使用多处理来加快处理大量文件的速度,而不是一个一个地读取它们。在那之前我做了一个测试来学习。下面是我的代码:
from multiprocessing.pool import Pool
from time import sleep, time
def print_cube(num):
aa1 = num * num
aa2 = num * num * num
return aa1, aa2
def main1():
start = time()
x = []
y = []
p = Pool(16)
for j in range(1, 5):
results = p.apply_async(print_cube, args = (j, ))
x.append(results.get()[0])
y.append(results.get()[1])
end = time()
return end - start, x, y
def main2():
start = time()
x = []
y = []
for j in range(1, 5):
results = print_cube(j)
x.append(results[0])
y.append(results[1])
end = time()
return end - start, x, y
if __name__ == "__main__":
print("Method1{0}time : {1}{2}x : {3}{4}y : {5}".format('\n' ,main1()[0], '\n', main1()[1], '\n', main1()[2]))
print("Method2{0}time : {1:.6f}{2}x : {3}{4}y : {5}".format('\n' ,main2()[0], '\n', main2()[1], '\n', main2()[2]))
结果是:
Method1
time : 0.1549079418182373
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
Method2
time : 0.000000
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
方法 1 使用多处理,消耗更多 CPU,但比方法 2 花费更多时间。
即使循环次数j达到5000或更多,方法2的效果也比方法1好。谁能告诉我我的代码有什么问题吗?
使用多处理会产生您没有的开销,例如 (1) 创建进程,(2) 将参数传递给辅助函数,在不同的进程中是 运行 以及 (3)将结果传递回您的主要流程。因此,worker 函数必须足够 CPU-intensive 以便您通过 运行 它并行获得的收益抵消我刚才提到的额外开销。您的工作函数 print_cube
不符合该标准,因为它不够 CPU-intensive.
但你甚至不是 运行 你的并行工作函数。
您正在通过调用方法 multiprocessing.pool.Pool.apply_async
循环提交任务,其中 returns 是 multiprocessing.pool.AsyncResult
的一个实例,但在您再次调用 apply_async
提交下一个之前您在 AsyncResult
上调用方法 get
的任务,因此会阻塞,直到第一个任务完成并且 returns 在您提交第二个任务之前得到结果!!!您必须使用 apply_async
提交 所有 任务并 保存 返回的 AsyncResult
个实例,然后才调用 get
在这些情况下。只有这样你才能实现并行。即便如此,您的工作函数 print_cube
使用太少 CPU 来克服多处理使用的额外开销比串行处理更高效。
在下面的代码中,我 (1) 更正了多处理代码以执行并行并创建一个大小为 5 的池(没有理由创建一个进程数超过您要提交的任务数的池或纯粹 CPU-bound 任务的 CPU 处理器数量;这只是您无缘无故造成的额外开销)和 (2) 将 print_cube
修改为非常 CPU-intensive 展示多处理的优势(尽管是人为的方式):
from multiprocessing.pool import Pool
from time import sleep, time
def print_cube(num):
# emulate a CPU-intensive calculation:
for _ in range(10_000_000):
aa1 = num * num
aa2 = num * num * num
return aa1, aa2
def main1():
start = time()
x = []
y = []
p = Pool(5)
# Submit all the tasks and save the AsyncResult instances:
results = [p.apply_async(print_cube, args = (j, )) for j in range(1, 5)]
# Now wait for the return values:
for result in results:
# Unpack the tuple:
x_value, y_value = result.get()
x.append(x_value)
y.append(y_value)
end = time()
return end - start, x, y
def main2():
start = time()
x = []
y = []
for j in range(1, 5):
results = print_cube(j)
x.append(results[0])
y.append(results[1])
end = time()
return end - start, x, y
if __name__ == "__main__":
print("Method1{0}time : {1}{2}x : {3}{4}y : {5}".format('\n' ,main1()[0], '\n', main1()[1], '\n', main1()[2]))
print("Method2{0}time : {1:.6f}{2}x : {3}{4}y : {5}".format('\n' ,main2()[0], '\n', main2()[1], '\n', main2()[2]))
打印:
Method1
time : 1.109999656677246
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
Method2
time : 2.827015
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
重要提示
除非您有固态驱动器,否则您可能会发现尝试并行读取多个文件可能会 counter-productive 因为头部来回移动。这也可能是多线程的工作better-suited。
@Booboo 首先,非常感谢您详细而精彩的解释。它帮助我更好地理解 python 的多处理工具,你的代码也是一个很好的例子。而下次尝试应用multiprocessing时,我想我会首先考虑任务是否满足你说的multiprocessing的特性。抱歉回复晚了,我 运行 做了一些实验。
其次,我 运行 你在我的电脑上给出的代码,它显示了与你的相似的结果,其中方法 1 确实比方法 2 花费更少的时间和更高的 CPU 消耗。
Method1
time : 1.0751237869262695
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
Method2
time : 3.642306
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
第三,关于你写的note,数据文件存储在固态硬盘中,我测试了方法1处理大约50 * 100 MB csv文件的时间和CPU消耗(多处理)、Method2(无)和 Method3(多线程)。 Method2 确实消耗了很高百分比的 CPU,50%,但没有像 Method1 那样达到最大值。结果如下:
time : 12.527468204498291
time : 59.400668144226074
time : 35.45922660827637
第四,下面是模拟CPU-intensive计算的例子:
import threading
from multiprocessing.pool import Pool
from queue import Queue
from time import time
def print_cube(num):
# emulate a CPU-intensive calculation:
for _ in range(10_000_000_0):
aa1 = num * num
aa2 = num * num * num
return aa1, aa2
def print_cube_queue(num, q):
# emulate a CPU-intensive calculation:
for _ in range(10_000_000_0):
aa1 = num * num
aa2 = num * num * num
q.put((aa1, aa2))
def main1():
start = time()
x = []
y = []
p = Pool(8)
# Submit all the tasks and save the AsyncResult instances:
results = [p.apply_async(print_cube, args = (j, )) for j in range(1, 5)]
# Now wait for the return values:
for result in results:
# Unpack the tuple:
x_value, y_value = result.get()
x.append(x_value)
y.append(y_value)
end = time()
return end - start, x, y
def main2():
start = time()
x = []
y = []
for j in range(1, 5):
results = print_cube(j)
x.append(results[0])
y.append(results[1])
end = time()
return end - start, x, y
def main3():
start = time()
q = Queue()
x = []
y = []
threads = []
for j in range(1, 5):
t = threading.Thread(target=print_cube_queue, args = (j, q))
t.start()
threads.append(t)
for thread in threads:
thread.join()
results = []
for thread in threads:
x_value, y_value = q.get()
x.append(x_value)
y.append(y_value) #q.get()按顺序从q中拿出一个值
end = time()
return end - start, x, y
if __name__ == "__main__":
print("Method1{0}time : {1}{2}x : {3}{4}y : {5}".format('\n' ,main1()[0], '\n', main1()[1], '\n', main1()[2]))
print("Method2{0}time : {1:.6f}{2}x : {3}{4}y : {5}".format('\n' ,main2()[0], '\n', main2()[1], '\n', main2()[2]))
print("Method3{0}time : {1:.6f}{2}x : {3}{4}y : {5}".format('\n' ,main3()[0], '\n', main3()[1], '\n', main3()[2]))
结果是:
Method1
time : 9.838010549545288
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
Method2
time : 35.850124
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
Method3
time : 37.191602
x : [4, 16, 9, 1]
y : [8, 1, 64, 27]
查了下,不知道是GIL的原因还是其他的。
我正在尝试使用多处理来加快处理大量文件的速度,而不是一个一个地读取它们。在那之前我做了一个测试来学习。下面是我的代码:
from multiprocessing.pool import Pool
from time import sleep, time
def print_cube(num):
aa1 = num * num
aa2 = num * num * num
return aa1, aa2
def main1():
start = time()
x = []
y = []
p = Pool(16)
for j in range(1, 5):
results = p.apply_async(print_cube, args = (j, ))
x.append(results.get()[0])
y.append(results.get()[1])
end = time()
return end - start, x, y
def main2():
start = time()
x = []
y = []
for j in range(1, 5):
results = print_cube(j)
x.append(results[0])
y.append(results[1])
end = time()
return end - start, x, y
if __name__ == "__main__":
print("Method1{0}time : {1}{2}x : {3}{4}y : {5}".format('\n' ,main1()[0], '\n', main1()[1], '\n', main1()[2]))
print("Method2{0}time : {1:.6f}{2}x : {3}{4}y : {5}".format('\n' ,main2()[0], '\n', main2()[1], '\n', main2()[2]))
结果是:
Method1
time : 0.1549079418182373
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
Method2
time : 0.000000
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
方法 1 使用多处理,消耗更多 CPU,但比方法 2 花费更多时间。
即使循环次数j达到5000或更多,方法2的效果也比方法1好。谁能告诉我我的代码有什么问题吗?
使用多处理会产生您没有的开销,例如 (1) 创建进程,(2) 将参数传递给辅助函数,在不同的进程中是 运行 以及 (3)将结果传递回您的主要流程。因此,worker 函数必须足够 CPU-intensive 以便您通过 运行 它并行获得的收益抵消我刚才提到的额外开销。您的工作函数 print_cube
不符合该标准,因为它不够 CPU-intensive.
但你甚至不是 运行 你的并行工作函数。
您正在通过调用方法 multiprocessing.pool.Pool.apply_async
循环提交任务,其中 returns 是 multiprocessing.pool.AsyncResult
的一个实例,但在您再次调用 apply_async
提交下一个之前您在 AsyncResult
上调用方法 get
的任务,因此会阻塞,直到第一个任务完成并且 returns 在您提交第二个任务之前得到结果!!!您必须使用 apply_async
提交 所有 任务并 保存 返回的 AsyncResult
个实例,然后才调用 get
在这些情况下。只有这样你才能实现并行。即便如此,您的工作函数 print_cube
使用太少 CPU 来克服多处理使用的额外开销比串行处理更高效。
在下面的代码中,我 (1) 更正了多处理代码以执行并行并创建一个大小为 5 的池(没有理由创建一个进程数超过您要提交的任务数的池或纯粹 CPU-bound 任务的 CPU 处理器数量;这只是您无缘无故造成的额外开销)和 (2) 将 print_cube
修改为非常 CPU-intensive 展示多处理的优势(尽管是人为的方式):
from multiprocessing.pool import Pool
from time import sleep, time
def print_cube(num):
# emulate a CPU-intensive calculation:
for _ in range(10_000_000):
aa1 = num * num
aa2 = num * num * num
return aa1, aa2
def main1():
start = time()
x = []
y = []
p = Pool(5)
# Submit all the tasks and save the AsyncResult instances:
results = [p.apply_async(print_cube, args = (j, )) for j in range(1, 5)]
# Now wait for the return values:
for result in results:
# Unpack the tuple:
x_value, y_value = result.get()
x.append(x_value)
y.append(y_value)
end = time()
return end - start, x, y
def main2():
start = time()
x = []
y = []
for j in range(1, 5):
results = print_cube(j)
x.append(results[0])
y.append(results[1])
end = time()
return end - start, x, y
if __name__ == "__main__":
print("Method1{0}time : {1}{2}x : {3}{4}y : {5}".format('\n' ,main1()[0], '\n', main1()[1], '\n', main1()[2]))
print("Method2{0}time : {1:.6f}{2}x : {3}{4}y : {5}".format('\n' ,main2()[0], '\n', main2()[1], '\n', main2()[2]))
打印:
Method1
time : 1.109999656677246
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
Method2
time : 2.827015
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
重要提示
除非您有固态驱动器,否则您可能会发现尝试并行读取多个文件可能会 counter-productive 因为头部来回移动。这也可能是多线程的工作better-suited。
@Booboo 首先,非常感谢您详细而精彩的解释。它帮助我更好地理解 python 的多处理工具,你的代码也是一个很好的例子。而下次尝试应用multiprocessing时,我想我会首先考虑任务是否满足你说的multiprocessing的特性。抱歉回复晚了,我 运行 做了一些实验。
其次,我 运行 你在我的电脑上给出的代码,它显示了与你的相似的结果,其中方法 1 确实比方法 2 花费更少的时间和更高的 CPU 消耗。
Method1
time : 1.0751237869262695
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
Method2
time : 3.642306
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
第三,关于你写的note,数据文件存储在固态硬盘中,我测试了方法1处理大约50 * 100 MB csv文件的时间和CPU消耗(多处理)、Method2(无)和 Method3(多线程)。 Method2 确实消耗了很高百分比的 CPU,50%,但没有像 Method1 那样达到最大值。结果如下:
time : 12.527468204498291
time : 59.400668144226074
time : 35.45922660827637
第四,下面是模拟CPU-intensive计算的例子:
import threading
from multiprocessing.pool import Pool
from queue import Queue
from time import time
def print_cube(num):
# emulate a CPU-intensive calculation:
for _ in range(10_000_000_0):
aa1 = num * num
aa2 = num * num * num
return aa1, aa2
def print_cube_queue(num, q):
# emulate a CPU-intensive calculation:
for _ in range(10_000_000_0):
aa1 = num * num
aa2 = num * num * num
q.put((aa1, aa2))
def main1():
start = time()
x = []
y = []
p = Pool(8)
# Submit all the tasks and save the AsyncResult instances:
results = [p.apply_async(print_cube, args = (j, )) for j in range(1, 5)]
# Now wait for the return values:
for result in results:
# Unpack the tuple:
x_value, y_value = result.get()
x.append(x_value)
y.append(y_value)
end = time()
return end - start, x, y
def main2():
start = time()
x = []
y = []
for j in range(1, 5):
results = print_cube(j)
x.append(results[0])
y.append(results[1])
end = time()
return end - start, x, y
def main3():
start = time()
q = Queue()
x = []
y = []
threads = []
for j in range(1, 5):
t = threading.Thread(target=print_cube_queue, args = (j, q))
t.start()
threads.append(t)
for thread in threads:
thread.join()
results = []
for thread in threads:
x_value, y_value = q.get()
x.append(x_value)
y.append(y_value) #q.get()按顺序从q中拿出一个值
end = time()
return end - start, x, y
if __name__ == "__main__":
print("Method1{0}time : {1}{2}x : {3}{4}y : {5}".format('\n' ,main1()[0], '\n', main1()[1], '\n', main1()[2]))
print("Method2{0}time : {1:.6f}{2}x : {3}{4}y : {5}".format('\n' ,main2()[0], '\n', main2()[1], '\n', main2()[2]))
print("Method3{0}time : {1:.6f}{2}x : {3}{4}y : {5}".format('\n' ,main3()[0], '\n', main3()[1], '\n', main3()[2]))
结果是:
Method1
time : 9.838010549545288
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
Method2
time : 35.850124
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
Method3
time : 37.191602
x : [4, 16, 9, 1]
y : [8, 1, 64, 27]
查了下,不知道是GIL的原因还是其他的。