Python 多处理 imap 块大小
Python Multiprocessing imap chunksize
我想比较具有相同路径结构和所有子文件夹中相同文件的两个文件夹。文件夹很大,大小约80GB,文件数8000个。
我想确保两个顶级目录下的每个对应文件对具有相同的 md5 校验和值。我写了一个简单的树DFS函数搜索两个目录下的所有文件,根据文件大小排序,将它们存储在两个列表中。
当我遍历列表时,我发现进行所有比较非常耗时,而且 CPU 使用率也很低。
我认为多处理模块对这种情况很有用。这是我的多处理实现:
from multiprocessing import Pool, cpu_count
import hashlib
def calc_md5(item):
m = hashlib.md5()
with open(item, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
m.update(chunk)
return m.hexdigest()
def worker(args):
a, b = args
return calc_md5(a) == calc_md5(b)
def multi_compare(queue_a, queue_b, thread):
pool = Pool(processes = cpu_count() - 1)
# Task iterable
task = zip(queue_a, queue_b)
# Multiprocessing
for retval in pool.imap_unordered(worker, task, chunksize = 5):
if not retval:
print "Bad Detected"
这里queue_a和queue_b是按照文件大小排序的待比较文件的路径。我期望这种多处理方法具有更高的 CPU 使用率和更好的性能,但事实似乎并非如此。简单的顺序迭代大约需要 3200 秒才能完成,而多处理方法大约需要 4600 秒。
我很好奇为什么会这样?这是使用多处理的好点吗?我的代码中这种糟糕性能的瓶颈是什么?有什么办法可以改善吗?
编辑:
我根据我的直觉设置了块大小。我想我可以将其更改为 queue_a 或 queue_b 的长度除以线程号并将任务队列排序为包含 queue_a[0::thread] 的前 1/4或 queue_b[0::thread] 元素,反之亦然。这会将相似大小的任务提供给所有线程,并使所有线程始终处于忙碌状态。我不知道这是否是获得额外性能的好方法,我仍在对此进行测试。
编辑:
以上编辑中的测试需要 4000 秒才能完成。比 chunksize = 5 稍微好一些。仍然比 serial 方法差。
所以我想问一下如何确定这个多处理程序的瓶颈。
谢谢!
是IO限制了性能。
MD5 算法现在对于 CPUs 来说太容易了。
以下代码计算GB/s.
中的MD5性能
import time
import hashlib
from multiprocessing import Pool
def worker(x):
data = bytearray(xrange(256)) * 4 * 1024
md5 = hashlib.md5()
for x in xrange(1024):
md5.update(data)
if __name__ == '__main__':
num_workers = 4
pool = Pool(num_workers)
start = time.time()
pool.map(worker, xrange(num_workers))
print num_workers / (time.time() - start), 'Gb/s'
相对较弱的intel现代移动i3 CPU(2核4线程)
能够以每秒 1 Gb 的速度散列。将此与
SATA3 bandwidth 即 600 Mb/s。
所以即使使用SSD,磁盘接口也会限制散列速度。
在 HDD 上,情况更糟。
多个 readers 将迫使磁盘移动其读取磁头,从而导致比仅使用一个 reader 线程时更多的延迟。
这就像阅读一个碎片化程度很高的文件。
当数据集不是那么大时 OS 的文件缓存可以提供很大帮助。不过,这不是你的情况。
我想比较具有相同路径结构和所有子文件夹中相同文件的两个文件夹。文件夹很大,大小约80GB,文件数8000个。
我想确保两个顶级目录下的每个对应文件对具有相同的 md5 校验和值。我写了一个简单的树DFS函数搜索两个目录下的所有文件,根据文件大小排序,将它们存储在两个列表中。
当我遍历列表时,我发现进行所有比较非常耗时,而且 CPU 使用率也很低。
我认为多处理模块对这种情况很有用。这是我的多处理实现:
from multiprocessing import Pool, cpu_count
import hashlib
def calc_md5(item):
m = hashlib.md5()
with open(item, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
m.update(chunk)
return m.hexdigest()
def worker(args):
a, b = args
return calc_md5(a) == calc_md5(b)
def multi_compare(queue_a, queue_b, thread):
pool = Pool(processes = cpu_count() - 1)
# Task iterable
task = zip(queue_a, queue_b)
# Multiprocessing
for retval in pool.imap_unordered(worker, task, chunksize = 5):
if not retval:
print "Bad Detected"
这里queue_a和queue_b是按照文件大小排序的待比较文件的路径。我期望这种多处理方法具有更高的 CPU 使用率和更好的性能,但事实似乎并非如此。简单的顺序迭代大约需要 3200 秒才能完成,而多处理方法大约需要 4600 秒。
我很好奇为什么会这样?这是使用多处理的好点吗?我的代码中这种糟糕性能的瓶颈是什么?有什么办法可以改善吗?
编辑: 我根据我的直觉设置了块大小。我想我可以将其更改为 queue_a 或 queue_b 的长度除以线程号并将任务队列排序为包含 queue_a[0::thread] 的前 1/4或 queue_b[0::thread] 元素,反之亦然。这会将相似大小的任务提供给所有线程,并使所有线程始终处于忙碌状态。我不知道这是否是获得额外性能的好方法,我仍在对此进行测试。
编辑: 以上编辑中的测试需要 4000 秒才能完成。比 chunksize = 5 稍微好一些。仍然比 serial 方法差。 所以我想问一下如何确定这个多处理程序的瓶颈。
谢谢!
是IO限制了性能。 MD5 算法现在对于 CPUs 来说太容易了。 以下代码计算GB/s.
中的MD5性能import time
import hashlib
from multiprocessing import Pool
def worker(x):
data = bytearray(xrange(256)) * 4 * 1024
md5 = hashlib.md5()
for x in xrange(1024):
md5.update(data)
if __name__ == '__main__':
num_workers = 4
pool = Pool(num_workers)
start = time.time()
pool.map(worker, xrange(num_workers))
print num_workers / (time.time() - start), 'Gb/s'
相对较弱的intel现代移动i3 CPU(2核4线程)
能够以每秒 1 Gb 的速度散列。将此与
SATA3 bandwidth 即 600 Mb/s。
所以即使使用SSD,磁盘接口也会限制散列速度。
在 HDD 上,情况更糟。
多个 readers 将迫使磁盘移动其读取磁头,从而导致比仅使用一个 reader 线程时更多的延迟。
这就像阅读一个碎片化程度很高的文件。
当数据集不是那么大时 OS 的文件缓存可以提供很大帮助。不过,这不是你的情况。