多处理池比手动实例化多个进程慢得多
Multiprocessing Pool much slower than manually instantiating multiple Processes
我正在从一个大文件中读取一个块,将其作为行列表加载到内存中,然后在每一行上处理一个任务。
顺序解决方案花费的时间太长,所以我开始研究如何并行化它。
我想到的第一个解决方案是使用流程并管理列表的每个子流程切片。
import multiprocessing as mp
BIG_FILE_PATH = 'big_file.txt'
CHUNKSIZE = '1000000'
N_PROCESSES = mp.cpu_count()
def read_in_chunks(file_object, chunk_size=1024):
while True:
data = file_object.read(chunk_size)
if not data:
break
yield data
with open(BIG_FILE_PATH, encoding="Latin-1") as file:
for piece in read_in_chunks(file, CHUNKSIZE):
jobs = []
piece_list = piece.splitlines()
piece_list_len = len(piece_list)
item_delta = round(piece_list_len/N_PROCESSES)
start = 0
for process in range(N_PROCESSES):
finish = start + item_delta
p = mp.Process(target=work, args=(piece_list[start:finish]))
start = finish
jobs.append(p)
p.start()
for job in jobs:
job.join()
它在大约 2498 毫秒内完成每个块。
然后我发现了自动管理切片的池工具。
import multiprocessing as mp
BIG_FILE_PATH = 'big_file.txt'
CHUNKSIZE = '1000000'
N_PROCESSES = mp.cpu_count()
def read_in_chunks(file_object, chunk_size=1024):
while True:
data = file_object.read(chunk_size)
if not data:
break
yield data
with open(BIG_FILE_PATH, encoding="Latin-1") as file:
with mp.Pool(N_PROCESSES) as pool:
for piece in read_in_chunks(file, CHUNKSIZE):
piece_list = piece.splitlines()
pool.map(work, piece_list)
它在大约 15540 毫秒内完成每个块,比手动慢 6 倍,但仍然比顺序快。
我是不是用错了Pool?
有更好或更快的方法吗?
感谢您的阅读。
更新
正如 Hannu 所建议的那样,Pool 的开销相当大。
Process 方法调用的工作函数需要行列表。
由于 Pool 决定切片的方式,Pool 方法调用的工作函数需要一行。
我不太确定如何让池一次给某个工作人员多行。
这应该能解决问题?
更新 2
最后一个问题,有第三种更好的方法吗?
我不知道这是否可行,但你可以试试这个吗?
if __name__ == "__main__":
with open(BIG_FILE_PATH, encoding="Latin-1") as file:
with mp.Pool(N_PROCESSES) as pool:
for piece in read_in_chunks(file, CHUNKSIZE):
piece_list = piece.splitlines()
pool.map(work, piece_list)
我的推理:
1。 pool.map() ,只需要一次,你的代码就会循环它
2。我猜循环使它变慢了
3。因为并行处理应该更快嘿嘿
我对此并不完全确定,但在我看来,您的程序在提交给工人的内容上存在重大差异。
在您的 Process 方法中,您似乎提交了大量行:
p = mp.Process(target=work, args=(piece_list[start:finish]))
但是当你使用 Pool 时,你会这样做:
for piece in read_in_chunks(file, CHUNKSIZE):
piece_list = piece.splitlines()
pool.map(work, piece_list)
您 读取 您的文件,但是当您使用 splitlines
时,您的 piece_list
可迭代 提交 以一为单位。
这意味着在您的流程方法中,您提交的子任务与您拥有的 CPU 一样多,但在您的 Pool 方法中,您提交的任务与您的源数据中的行数一样多。如果你有很多行,这将在你的 Pool 中产生大量的编排开销,因为每个工作人员一次只处理一行,然后完成,returns 结果,然后 Pool 将另一行提交给新释放的工作人员。
如果这就是这里发生的事情,那肯定可以解释为什么 Pool 需要更长的时间才能完成。
如果您使用 reader 作为可迭代对象并跳过行拆分部分会发生什么:
pool.map(work, read_in_chunks(file, CHUNKSIZE))
天哪!弄清楚这是一段很长的路要走,但仍然很有趣。
Pool.map 正在从迭代器中单独获取、腌制和传递每个项目给每个工作人员。工作人员完成后,冲洗并重复 get -> pickle -> pass。这会产生明显的管理费用。
这实际上是有意为之,因为 Pool.map 不够智能,无法知道迭代器的长度,也无法有效地创建列表列表并在其中传递每个列表(chunk) 给工人。
但是,这是有帮助的。
简单地将列表转换为块列表 (lists) 并使用列表理解就像一个魅力,并将开销减少到与 Process 方法相同的水平。
import multiprocessing as mp
BIG_FILE_PATH = 'big_file.txt'
CHUNKSIZE = '1000000'
N_PROCESSES = mp.cpu_count()
def read_in_chunks(file_object, chunk_size=1024):
while True:
data = file_object.read(chunk_size)
if not data:
break
yield data
with open(BIG_FILE_PATH, encoding="Latin-1") as file:
with mp.Pool(N_PROCESSES) as pool:
for piece in read_in_chunks(file, CHUNKSIZE):
piece_list = piece.splitlines()
piece_list_len = len(piece_list)
item_delta = round(piece_list_len / N_PROCESSES)
pool.map(work, [piece_list[i:i + item_delta] for i in range(0, piece_list_len, item_delta)])
这个带有列表迭代器列表的池与 Process 方法的 运行 时间完全相同。
我正在从一个大文件中读取一个块,将其作为行列表加载到内存中,然后在每一行上处理一个任务。
顺序解决方案花费的时间太长,所以我开始研究如何并行化它。
我想到的第一个解决方案是使用流程并管理列表的每个子流程切片。
import multiprocessing as mp
BIG_FILE_PATH = 'big_file.txt'
CHUNKSIZE = '1000000'
N_PROCESSES = mp.cpu_count()
def read_in_chunks(file_object, chunk_size=1024):
while True:
data = file_object.read(chunk_size)
if not data:
break
yield data
with open(BIG_FILE_PATH, encoding="Latin-1") as file:
for piece in read_in_chunks(file, CHUNKSIZE):
jobs = []
piece_list = piece.splitlines()
piece_list_len = len(piece_list)
item_delta = round(piece_list_len/N_PROCESSES)
start = 0
for process in range(N_PROCESSES):
finish = start + item_delta
p = mp.Process(target=work, args=(piece_list[start:finish]))
start = finish
jobs.append(p)
p.start()
for job in jobs:
job.join()
它在大约 2498 毫秒内完成每个块。
然后我发现了自动管理切片的池工具。
import multiprocessing as mp
BIG_FILE_PATH = 'big_file.txt'
CHUNKSIZE = '1000000'
N_PROCESSES = mp.cpu_count()
def read_in_chunks(file_object, chunk_size=1024):
while True:
data = file_object.read(chunk_size)
if not data:
break
yield data
with open(BIG_FILE_PATH, encoding="Latin-1") as file:
with mp.Pool(N_PROCESSES) as pool:
for piece in read_in_chunks(file, CHUNKSIZE):
piece_list = piece.splitlines()
pool.map(work, piece_list)
它在大约 15540 毫秒内完成每个块,比手动慢 6 倍,但仍然比顺序快。
我是不是用错了Pool? 有更好或更快的方法吗?
感谢您的阅读。
更新
正如 Hannu 所建议的那样,Pool 的开销相当大。
Process 方法调用的工作函数需要行列表。
由于 Pool 决定切片的方式,Pool 方法调用的工作函数需要一行。
我不太确定如何让池一次给某个工作人员多行。
这应该能解决问题?
更新 2
最后一个问题,有第三种更好的方法吗?
我不知道这是否可行,但你可以试试这个吗?
if __name__ == "__main__":
with open(BIG_FILE_PATH, encoding="Latin-1") as file:
with mp.Pool(N_PROCESSES) as pool:
for piece in read_in_chunks(file, CHUNKSIZE):
piece_list = piece.splitlines()
pool.map(work, piece_list)
我的推理:
1。 pool.map() ,只需要一次,你的代码就会循环它
2。我猜循环使它变慢了
3。因为并行处理应该更快嘿嘿
我对此并不完全确定,但在我看来,您的程序在提交给工人的内容上存在重大差异。
在您的 Process 方法中,您似乎提交了大量行:
p = mp.Process(target=work, args=(piece_list[start:finish]))
但是当你使用 Pool 时,你会这样做:
for piece in read_in_chunks(file, CHUNKSIZE):
piece_list = piece.splitlines()
pool.map(work, piece_list)
您 读取 您的文件,但是当您使用 splitlines
时,您的 piece_list
可迭代 提交 以一为单位。
这意味着在您的流程方法中,您提交的子任务与您拥有的 CPU 一样多,但在您的 Pool 方法中,您提交的任务与您的源数据中的行数一样多。如果你有很多行,这将在你的 Pool 中产生大量的编排开销,因为每个工作人员一次只处理一行,然后完成,returns 结果,然后 Pool 将另一行提交给新释放的工作人员。
如果这就是这里发生的事情,那肯定可以解释为什么 Pool 需要更长的时间才能完成。
如果您使用 reader 作为可迭代对象并跳过行拆分部分会发生什么:
pool.map(work, read_in_chunks(file, CHUNKSIZE))
天哪!弄清楚这是一段很长的路要走,但仍然很有趣。
Pool.map 正在从迭代器中单独获取、腌制和传递每个项目给每个工作人员。工作人员完成后,冲洗并重复 get -> pickle -> pass。这会产生明显的管理费用。
这实际上是有意为之,因为 Pool.map 不够智能,无法知道迭代器的长度,也无法有效地创建列表列表并在其中传递每个列表(chunk) 给工人。
但是,这是有帮助的。 简单地将列表转换为块列表 (lists) 并使用列表理解就像一个魅力,并将开销减少到与 Process 方法相同的水平。
import multiprocessing as mp
BIG_FILE_PATH = 'big_file.txt'
CHUNKSIZE = '1000000'
N_PROCESSES = mp.cpu_count()
def read_in_chunks(file_object, chunk_size=1024):
while True:
data = file_object.read(chunk_size)
if not data:
break
yield data
with open(BIG_FILE_PATH, encoding="Latin-1") as file:
with mp.Pool(N_PROCESSES) as pool:
for piece in read_in_chunks(file, CHUNKSIZE):
piece_list = piece.splitlines()
piece_list_len = len(piece_list)
item_delta = round(piece_list_len / N_PROCESSES)
pool.map(work, [piece_list[i:i + item_delta] for i in range(0, piece_list_len, item_delta)])
这个带有列表迭代器列表的池与 Process 方法的 运行 时间完全相同。