Python 多处理进程在一段时间后休眠
Python multiprocessing processes sleep after a while
我有一个脚本,它在一个目录中运行,并在所有具有给定结尾(即 .xml)的文件中搜索给定字符串并替换它们。为此,我使用了 python 多处理库。
例如,我使用了 1100 个 .xml 文件,其中包含大约 200MB 的数据。在我的 MBP '15 15".
上,完整的执行时间是 8 分钟
但是几分钟后,进程的进程将进入睡眠状态,我在“顶部”看到了这一点(这里是在 7m 之后...)。
最高输出
PID COMMAND %CPU TIME #TH #WQ #PORT MEM PURG CMPR PGRP PPID STATE BOOSTS %CPU_ME %CPU_OTHRS
1007 Python 0.0 07:03.51 1 0 7 5196K 0B 0B 998 998 sleeping *0[1] 0.00000 0.00000
1006 Python 99.8 07:29.07 1/1 0 7 4840K 0B 0B 998 998 running *0[1] 0.00000 0.00000
1005 Python 0.0 02:10.02 1 0 7 4380K 0B 0B 998 998 sleeping *0[1] 0.00000 0.00000
1004 Python 0.0 04:24.44 1 0 7 4624K 0B 0B 998 998 sleeping *0[1] 0.00000 0.00000
1003 Python 0.0 04:25.34 1 0 7 4572K 0B 0B 998 998 sleeping *0[1] 0.00000 0.00000
1002 Python 0.0 04:53.40 1 0 7 4612K 0B 0B 998 998 sleeping *0[1] 0.00000 0.00000
所以现在只有一个进程在做所有的工作,而其他进程在 4 分钟后就睡着了。
代码片段
# set cpu pool to cores in computer
pool_size = multiprocessing.cpu_count()
# create pool
pool = multiprocessing.Pool(processes=pool_size)
# give pool function and input data - here for each file in file_list
pool_outputs = pool.map(check_file, file_list)
# if no more tasks are available: close all
pool.close()
pool.join()
那么为什么所有进程都进入休眠状态?
我的猜测:文件列表与池中的所有工作人员分开(每个工作人员数量相同),少数人只是“幸运”地获得了小文件 - 因此更早完成。这是真的吗?我只是在想它的工作方式更像是一个队列,这样每个工作人员在完成后都会得到一个新文件 - 直到列表为空。
正如@Felipe-Lema 指出的那样,它是经典的 RTFM。
我使用多处理队列而不是池重新编写了脚本中提到的部分并改进了运行时间:
def check_files(file_list):
"""Checks and replaces lines in files
@param file_list: list of files to search
@return counter: number of occurrence """
# as much workers as CPUs are available (HT included)
workers = multiprocessing.cpu_count()
# create two queues: one for files, one for results
work_queue = Queue()
done_queue = Queue()
processes = []
# add every file to work queue
for filename in file_list:
work_queue.put(filename)
# start processes
for w in xrange(workers):
p = Process(target=worker, args=(work_queue, done_queue))
p.start()
processes.append(p)
work_queue.put('STOP')
# wait until all processes finished
for p in processes:
p.join()
done_queue.put('STOP')
# beautify results and return them
results = []
for status in iter(done_queue.get, 'STOP'):
if status is not None:
results.append(status)
return results
我有一个脚本,它在一个目录中运行,并在所有具有给定结尾(即 .xml)的文件中搜索给定字符串并替换它们。为此,我使用了 python 多处理库。
例如,我使用了 1100 个 .xml 文件,其中包含大约 200MB 的数据。在我的 MBP '15 15".
上,完整的执行时间是 8 分钟但是几分钟后,进程的进程将进入睡眠状态,我在“顶部”看到了这一点(这里是在 7m 之后...)。
最高输出
PID COMMAND %CPU TIME #TH #WQ #PORT MEM PURG CMPR PGRP PPID STATE BOOSTS %CPU_ME %CPU_OTHRS
1007 Python 0.0 07:03.51 1 0 7 5196K 0B 0B 998 998 sleeping *0[1] 0.00000 0.00000
1006 Python 99.8 07:29.07 1/1 0 7 4840K 0B 0B 998 998 running *0[1] 0.00000 0.00000
1005 Python 0.0 02:10.02 1 0 7 4380K 0B 0B 998 998 sleeping *0[1] 0.00000 0.00000
1004 Python 0.0 04:24.44 1 0 7 4624K 0B 0B 998 998 sleeping *0[1] 0.00000 0.00000
1003 Python 0.0 04:25.34 1 0 7 4572K 0B 0B 998 998 sleeping *0[1] 0.00000 0.00000
1002 Python 0.0 04:53.40 1 0 7 4612K 0B 0B 998 998 sleeping *0[1] 0.00000 0.00000
所以现在只有一个进程在做所有的工作,而其他进程在 4 分钟后就睡着了。
代码片段
# set cpu pool to cores in computer
pool_size = multiprocessing.cpu_count()
# create pool
pool = multiprocessing.Pool(processes=pool_size)
# give pool function and input data - here for each file in file_list
pool_outputs = pool.map(check_file, file_list)
# if no more tasks are available: close all
pool.close()
pool.join()
那么为什么所有进程都进入休眠状态?
我的猜测:文件列表与池中的所有工作人员分开(每个工作人员数量相同),少数人只是“幸运”地获得了小文件 - 因此更早完成。这是真的吗?我只是在想它的工作方式更像是一个队列,这样每个工作人员在完成后都会得到一个新文件 - 直到列表为空。
正如@Felipe-Lema 指出的那样,它是经典的 RTFM。
我使用多处理队列而不是池重新编写了脚本中提到的部分并改进了运行时间:
def check_files(file_list):
"""Checks and replaces lines in files
@param file_list: list of files to search
@return counter: number of occurrence """
# as much workers as CPUs are available (HT included)
workers = multiprocessing.cpu_count()
# create two queues: one for files, one for results
work_queue = Queue()
done_queue = Queue()
processes = []
# add every file to work queue
for filename in file_list:
work_queue.put(filename)
# start processes
for w in xrange(workers):
p = Process(target=worker, args=(work_queue, done_queue))
p.start()
processes.append(p)
work_queue.put('STOP')
# wait until all processes finished
for p in processes:
p.join()
done_queue.put('STOP')
# beautify results and return them
results = []
for status in iter(done_queue.get, 'STOP'):
if status is not None:
results.append(status)
return results