Python 单工多处理比顺序操作更快
Python multiprocessing with single worker faster than sequential operation
简要概述 - 我将一些包含大量随机数的随机文件写入光盘以测试 python 多处理与顺序操作的性能。
Function description
putfiles : 将测试文件写入驱动器
readFile : 读取传递的文件位置和 returns 结果(代码中的数字总和)
getSequential : 使用 for 循环读取一些文件
getParallel : 使用生成的多个进程读取文件
Performance results: (Read and process 100 files, with sequential and process pool)
timeit getSequential(numFiles=100) - 最好大约 2.85s
timeit getParallel(numFiles=100, numProcesses=4) - 大约 960 毫秒最好
timeit getParallel(numFiles=100, numProcesses=1) - 大约 980 毫秒最好
令人惊讶的是,单进程池的性能优于顺序进程,与 4 进程池相当。这种行为是预期的还是我在这里做错了什么?
import os
import random
from multiprocessing import Pool
os.chdir('/Users/test/Desktop/filewritetest')
def putfiles(numFiles=5, numCount=100):
#numFiles = int(input("how many files?: "))
#numCount = int(input('How many random numbers?: '))
for num in range(numFiles):
with open('r' + str(num) + '.txt', 'w') as f:
f.write("\n".join([str(random.randint(1, 100)) for i in range(numCount)]))
def readFile(fileurl):
with open(fileurl, 'r') as f, open("ans_" + fileurl, 'w') as fw:
fw.write(str((sum([int(i) for i in f.read().split()]))))
def getSequential(numFiles=5):
#in1 = int(input("how many files?: "))
for num in range(numFiles):
(readFile('r' + str(num) + '.txt'))
def getParallel(numFiles=5, numProcesses=2):
#numFiles = int(input("how many files?: "))
#numProcesses = int(input('How many processes?: '))
with Pool(numProcesses) as p:
p.map(readFile, ['r' + str(num) + '.txt' for num in range(numFiles)])
#putfiles()
putfiles(numFiles=1000, numCount=100000)
timeit getSequential(numFiles=100)
##around 2.85s best
timeit getParallel(numFiles=100, numProcesses=1)
##around 980ms best
timeit getParallel(numFiles=100, numProcesses=4)
##around 960ms best
Update: in a new session of sypder, I don't see this issue. Updated runtime below
##100 files
#around 2.97s best
timeit getSequential(numFiles=100)
#around 2.99s best
timeit getParallel(numFiles=100, numProcesses=1)
#around 1.57s best
timeit getParallel(numFiles=100, numProcesses=2)
#around 942ms best
timeit getParallel(numFiles=100, numProcesses=4)
##1000 files
#around 29.3s best
timeit getSequential(numFiles=1000)
#around 11.8s best
timeit getParallel(numFiles=1000, numProcesses=4)
#around 9.6s best
timeit getParallel(numFiles=1000, numProcesses=16)
#around 9.65s best #let pool choose best default value
timeit getParallel(numFiles=1000)
请不要将此视为答案,它是为了在 运行 python 3.x 中的内容时向您展示我的代码(您的 timeit 用法根本不起作用我,我假设它是 2.x)。对不起,我现在没有时间深入研究它。
[编辑] 在旋转驱动器上,考虑磁盘缓存:不要在不同的测试中访问相同的文件,或者只是切换测试顺序以查看是否涉及磁盘缓存
使用以下代码,手动更改 numProcesses=X 参数,我得到了这些结果:
在 SSD 上,1000 个顺序 0.31 秒,1 个线程 1000 个并行 0.37 秒,4 个线程 0.23 1000 个并行
import os
import random
import timeit
from multiprocessing import Pool
from contextlib import closing
os.chdir('c:\temp\')
def putfiles(numFiles=5, numCount=1):
#numFiles = int(input("how many files?: "))
#numCount = int(input('How many random numbers?: '))
for num in range(numFiles):
#print("num: " + str(num))
with open('r' + str(num) + '.txt', 'w') as f:
f.write("\n".join([str(random.randint(1, 100)) for i in range( numCount )]))
#print ("pufiles done")
def readFile(fileurl):
with open(fileurl, 'r') as f, open("ans_" + fileurl, 'w') as fw:
fw.write(str((sum([int(i) for i in f.read().split()]))))
def getSequential(numFiles=10000):
# print ("getSequential, nufile: " + str (numFiles))
#in1 = int(input("how many files?: "))
for num in range(numFiles):
#print ("getseq for")
(readFile('r' + str(num) + '.txt'))
#print ("getSequential done")
def getParallel(numFiles=10000, numProcesses=1):
#numFiles = int(input("how many files?: "))
#numProcesses = int(input('How many processes?: '))
#readFile, ['r' + str(num) + '.txt' for num in range(numFiles)]
#with Pool(10) as p:
with closing(Pool(processes=1)) as p:
p.map(readFile, ['r' + str(num) + '.txt' for num in range(numFiles)])
if __name__ == '__main__':
#putfiles(numFiles=10000, numCount=1)
print (timeit.timeit ("getSequential()","from __main__ import getSequential",number=1))
print (timeit.timeit ("getParallel()","from __main__ import getParallel",number=1))
#timeit (getParallel(numFiles=100, numProcesses=4)) #-around 960ms best
#timeit (getParallel(numFiles=100, numProcesses=1)) #-around 980ms best
简要概述 - 我将一些包含大量随机数的随机文件写入光盘以测试 python 多处理与顺序操作的性能。
Function description
putfiles : 将测试文件写入驱动器
readFile : 读取传递的文件位置和 returns 结果(代码中的数字总和)
getSequential : 使用 for 循环读取一些文件
getParallel : 使用生成的多个进程读取文件
Performance results: (Read and process 100 files, with sequential and process pool)
timeit getSequential(numFiles=100) - 最好大约 2.85s
timeit getParallel(numFiles=100, numProcesses=4) - 大约 960 毫秒最好
timeit getParallel(numFiles=100, numProcesses=1) - 大约 980 毫秒最好
令人惊讶的是,单进程池的性能优于顺序进程,与 4 进程池相当。这种行为是预期的还是我在这里做错了什么?
import os
import random
from multiprocessing import Pool
os.chdir('/Users/test/Desktop/filewritetest')
def putfiles(numFiles=5, numCount=100):
#numFiles = int(input("how many files?: "))
#numCount = int(input('How many random numbers?: '))
for num in range(numFiles):
with open('r' + str(num) + '.txt', 'w') as f:
f.write("\n".join([str(random.randint(1, 100)) for i in range(numCount)]))
def readFile(fileurl):
with open(fileurl, 'r') as f, open("ans_" + fileurl, 'w') as fw:
fw.write(str((sum([int(i) for i in f.read().split()]))))
def getSequential(numFiles=5):
#in1 = int(input("how many files?: "))
for num in range(numFiles):
(readFile('r' + str(num) + '.txt'))
def getParallel(numFiles=5, numProcesses=2):
#numFiles = int(input("how many files?: "))
#numProcesses = int(input('How many processes?: '))
with Pool(numProcesses) as p:
p.map(readFile, ['r' + str(num) + '.txt' for num in range(numFiles)])
#putfiles()
putfiles(numFiles=1000, numCount=100000)
timeit getSequential(numFiles=100)
##around 2.85s best
timeit getParallel(numFiles=100, numProcesses=1)
##around 980ms best
timeit getParallel(numFiles=100, numProcesses=4)
##around 960ms best
Update: in a new session of sypder, I don't see this issue. Updated runtime below
##100 files
#around 2.97s best
timeit getSequential(numFiles=100)
#around 2.99s best
timeit getParallel(numFiles=100, numProcesses=1)
#around 1.57s best
timeit getParallel(numFiles=100, numProcesses=2)
#around 942ms best
timeit getParallel(numFiles=100, numProcesses=4)
##1000 files
#around 29.3s best
timeit getSequential(numFiles=1000)
#around 11.8s best
timeit getParallel(numFiles=1000, numProcesses=4)
#around 9.6s best
timeit getParallel(numFiles=1000, numProcesses=16)
#around 9.65s best #let pool choose best default value
timeit getParallel(numFiles=1000)
请不要将此视为答案,它是为了在 运行 python 3.x 中的内容时向您展示我的代码(您的 timeit 用法根本不起作用我,我假设它是 2.x)。对不起,我现在没有时间深入研究它。
[编辑] 在旋转驱动器上,考虑磁盘缓存:不要在不同的测试中访问相同的文件,或者只是切换测试顺序以查看是否涉及磁盘缓存
使用以下代码,手动更改 numProcesses=X 参数,我得到了这些结果:
在 SSD 上,1000 个顺序 0.31 秒,1 个线程 1000 个并行 0.37 秒,4 个线程 0.23 1000 个并行
import os
import random
import timeit
from multiprocessing import Pool
from contextlib import closing
os.chdir('c:\temp\')
def putfiles(numFiles=5, numCount=1):
#numFiles = int(input("how many files?: "))
#numCount = int(input('How many random numbers?: '))
for num in range(numFiles):
#print("num: " + str(num))
with open('r' + str(num) + '.txt', 'w') as f:
f.write("\n".join([str(random.randint(1, 100)) for i in range( numCount )]))
#print ("pufiles done")
def readFile(fileurl):
with open(fileurl, 'r') as f, open("ans_" + fileurl, 'w') as fw:
fw.write(str((sum([int(i) for i in f.read().split()]))))
def getSequential(numFiles=10000):
# print ("getSequential, nufile: " + str (numFiles))
#in1 = int(input("how many files?: "))
for num in range(numFiles):
#print ("getseq for")
(readFile('r' + str(num) + '.txt'))
#print ("getSequential done")
def getParallel(numFiles=10000, numProcesses=1):
#numFiles = int(input("how many files?: "))
#numProcesses = int(input('How many processes?: '))
#readFile, ['r' + str(num) + '.txt' for num in range(numFiles)]
#with Pool(10) as p:
with closing(Pool(processes=1)) as p:
p.map(readFile, ['r' + str(num) + '.txt' for num in range(numFiles)])
if __name__ == '__main__':
#putfiles(numFiles=10000, numCount=1)
print (timeit.timeit ("getSequential()","from __main__ import getSequential",number=1))
print (timeit.timeit ("getParallel()","from __main__ import getParallel",number=1))
#timeit (getParallel(numFiles=100, numProcesses=4)) #-around 960ms best
#timeit (getParallel(numFiles=100, numProcesses=1)) #-around 980ms best