Python 单工多处理比顺序操作更快

Python multiprocessing with single worker faster than sequential operation

简要概述 - 我将一些包含大量随机数的随机文件写入光盘以测试 python 多处理与顺序操作的性能。

Function description

putfiles : 将测试文件写入驱动器

readFile : 读取传递的文件位置和 returns 结果(代码中的数字总和)

getSequential : 使用 for 循环读取一些文件

getParallel : 使用生成的多个进程读取文件

Performance results: (Read and process 100 files, with sequential and process pool)

timeit getSequential(numFiles=100) - 最好大约 2.85s

timeit getParallel(numFiles=100, numProcesses=4) - 大约 960 毫秒最好

timeit getParallel(numFiles=100, numProcesses=1) - 大约 980 毫秒最好

令人惊讶的是,单进程池的性能优于顺序进程,与 4 进程池相当。这种行为是预期的还是我在这里做错了什么?

import os
import random
from multiprocessing import Pool

os.chdir('/Users/test/Desktop/filewritetest')

def putfiles(numFiles=5, numCount=100):
    #numFiles = int(input("how many files?: "))
    #numCount = int(input('How many random numbers?: '))
    for num in range(numFiles):
        with open('r' + str(num) + '.txt', 'w') as f:
            f.write("\n".join([str(random.randint(1, 100)) for i in range(numCount)]))

def readFile(fileurl):
    with open(fileurl, 'r') as f, open("ans_" + fileurl, 'w') as fw:
        fw.write(str((sum([int(i) for i in f.read().split()]))))

def getSequential(numFiles=5):
    #in1 = int(input("how many files?: "))
    for num in range(numFiles):
        (readFile('r' + str(num) + '.txt'))


def getParallel(numFiles=5, numProcesses=2):
    #numFiles = int(input("how many files?: ")) 
    #numProcesses = int(input('How many processes?: '))
    with Pool(numProcesses) as p:
        p.map(readFile, ['r' + str(num) + '.txt' for num in range(numFiles)])


#putfiles()

putfiles(numFiles=1000, numCount=100000)

timeit getSequential(numFiles=100)
##around 2.85s best

timeit getParallel(numFiles=100, numProcesses=1)
##around 980ms best
timeit getParallel(numFiles=100, numProcesses=4)
##around 960ms best

Update: in a new session of sypder, I don't see this issue. Updated runtime below

##100 files
#around 2.97s best
timeit getSequential(numFiles=100)

#around 2.99s best
timeit getParallel(numFiles=100, numProcesses=1)

#around 1.57s best
timeit getParallel(numFiles=100, numProcesses=2)

#around 942ms best
timeit getParallel(numFiles=100, numProcesses=4)

##1000 files
#around 29.3s best
timeit getSequential(numFiles=1000)

#around 11.8s best
timeit getParallel(numFiles=1000, numProcesses=4)

#around 9.6s best
timeit getParallel(numFiles=1000, numProcesses=16)

#around 9.65s best  #let pool choose best default value
timeit getParallel(numFiles=1000)

请不要将此视为答案,它是为了在 运行 python 3.x 中的内容时向您展示我的代码(您的 timeit 用法根本不起作用我,我假设它是 2.x)。对不起,我现在没有时间深入研究它。

[编辑] 在旋转驱动器上,考虑磁盘缓存:不要在不同的测试中访问相同的文件,或者只是切换测试顺序以查看是否涉及磁盘缓存

使用以下代码,手动更改 numProcesses=X 参数,我得到了这些结果:

在 SSD 上,1000 个顺序 0.31 秒,1 个线程 1000 个并行 0.37 秒,4 个线程 0.23 1000 个并行

import os
import random
import timeit
from multiprocessing import Pool
from contextlib import closing

os.chdir('c:\temp\')

def putfiles(numFiles=5, numCount=1):
    #numFiles = int(input("how many files?: "))
    #numCount = int(input('How many random numbers?: '))
    for num in range(numFiles):
        #print("num: " + str(num))
        with open('r' + str(num) + '.txt', 'w') as f:
            f.write("\n".join([str(random.randint(1, 100)) for i in range( numCount )]))
    #print ("pufiles done")

def readFile(fileurl):
    with open(fileurl, 'r') as f, open("ans_" + fileurl, 'w') as fw:
        fw.write(str((sum([int(i) for i in f.read().split()]))))


def getSequential(numFiles=10000):
   # print ("getSequential, nufile: " + str (numFiles))
    #in1 = int(input("how many files?: "))
    for num in range(numFiles): 
        #print ("getseq for")
        (readFile('r' + str(num) + '.txt'))
    #print ("getSequential done")


def getParallel(numFiles=10000, numProcesses=1):
    #numFiles = int(input("how many files?: ")) 
    #numProcesses = int(input('How many processes?: '))
    #readFile, ['r' + str(num) + '.txt' for num in range(numFiles)]
    #with Pool(10) as p:
    with closing(Pool(processes=1)) as p:
       p.map(readFile, ['r' + str(num) + '.txt' for num in range(numFiles)])

if __name__ == '__main__':
    #putfiles(numFiles=10000, numCount=1)

    print (timeit.timeit ("getSequential()","from __main__ import getSequential",number=1))

    print (timeit.timeit ("getParallel()","from __main__ import getParallel",number=1)) 

#timeit (getParallel(numFiles=100, numProcesses=4)) #-around 960ms best

#timeit (getParallel(numFiles=100, numProcesses=1)) #-around 980ms best