多处理池比手动实例化多个进程慢得多

Multiprocessing Pool much slower than manually instantiating multiple Processes

我正在从一个大文件中读取一个块,将其作为行列表加载到内存中,然后在每一行上处理一个任务。

顺序解决方案花费的时间太长,所以我开始研究如何并行化它。

我想到的第一个解决方案是使用流程并管理列表的每个子流程切片。

import multiprocessing as mp

BIG_FILE_PATH = 'big_file.txt'
CHUNKSIZE = '1000000'
N_PROCESSES = mp.cpu_count()


def read_in_chunks(file_object, chunk_size=1024):
    while True:
        data = file_object.read(chunk_size)
        if not data:
            break
        yield data


with open(BIG_FILE_PATH, encoding="Latin-1") as file:
    for piece in read_in_chunks(file, CHUNKSIZE):
        jobs = []
        piece_list = piece.splitlines()
        piece_list_len = len(piece_list)
        item_delta = round(piece_list_len/N_PROCESSES)
        start = 0
        for process in range(N_PROCESSES):
            finish = start + item_delta
            p = mp.Process(target=work, args=(piece_list[start:finish]))
            start = finish
            jobs.append(p)
            p.start()
        for job in jobs:
            job.join()

它在大约 2498 毫秒内完成每个块。

然后我发现了自动管理切片的池工具。

import multiprocessing as mp

BIG_FILE_PATH = 'big_file.txt'
CHUNKSIZE = '1000000'
N_PROCESSES = mp.cpu_count()


def read_in_chunks(file_object, chunk_size=1024):
    while True:
        data = file_object.read(chunk_size)
        if not data:
            break
        yield data


with open(BIG_FILE_PATH, encoding="Latin-1") as file:
    with mp.Pool(N_PROCESSES) as pool:
        for piece in read_in_chunks(file, CHUNKSIZE):
            piece_list = piece.splitlines()
            pool.map(work, piece_list)

它在大约 15540 毫秒内完成每个块,比手动慢 6 倍,但仍然比顺序快。

我是不是用错了Pool? 有更好或更快的方法吗?

感谢您的阅读。

更新

正如 Hannu 所建议的那样,Pool 的开销相当大。

Process 方法调用的工作函数需要行列表。

由于 Pool 决定切片的方式,Pool 方法调用的工作函数需要一行。

我不太确定如何让池一次给某个工作人员多行。

这应该能解决问题?

更新 2

最后一个问题,有第三种更好的方法吗?

我不知道这是否可行,但你可以试试这个吗?

if __name__ == "__main__":
    with open(BIG_FILE_PATH, encoding="Latin-1") as file:
        with mp.Pool(N_PROCESSES) as pool:
            for piece in read_in_chunks(file, CHUNKSIZE):
                piece_list = piece.splitlines()
            pool.map(work, piece_list)

我的推理:
1。 pool.map() ,只需要一次,你的代码就会循环它
2。我猜循环使它变慢了
3。因为并行处理应该更快嘿嘿

我对此并不完全确定,但在我看来,您的程序在提交给工人的内容上存在重大差异。

在您的 Process 方法中,您似乎提交了大量行:

p = mp.Process(target=work, args=(piece_list[start:finish]))

但是当你使用 Pool 时,你会这样做:

for piece in read_in_chunks(file, CHUNKSIZE):
    piece_list = piece.splitlines()
    pool.map(work, piece_list)

读取 您的文件,但是当您使用 splitlines 时,您的 piece_list 可迭代 提交 以一为单位。

这意味着在您的流程方法中,您提交的子任务与您拥有的 CPU 一样多,但在您的 Pool 方法中,您提交的任务与您的源数据中的行数一样多。如果你有很多行,这将在你的 Pool 中产生大量的编排开销,因为每个工作人员一次只处理一行,然后完成,returns 结果,然后 Pool 将另一行提交给新释放的工作人员。

如果这就是这里发生的事情,那肯定可以解释为什么 Pool 需要更长的时间才能完成。

如果您使用 reader 作为可迭代对象并跳过行拆分部分会发生什么:

pool.map(work, read_in_chunks(file, CHUNKSIZE))

天哪!弄清楚这是一段很长的路要走,但仍然很有趣。

Pool.map 正在从迭代器中单独获取、腌制和传递每个项目给每个工作人员。工作人员完成后,冲洗并重复 get -> pickle -> pass。这会产生明显的管理费用。

这实际上是有意为之,因为 Pool.map 不够智能,无法知道迭代器的长度,也无法有效地创建列表列表并在其中传递每个列表(chunk) 给工人。

但是,这是有帮助的。 简单地将列表转换为块列表 (lists) 并使用列表理解就像一个魅力,并将开销减少到与 Process 方法相同的水平。

import multiprocessing as mp

BIG_FILE_PATH = 'big_file.txt'
CHUNKSIZE = '1000000'
N_PROCESSES = mp.cpu_count()


def read_in_chunks(file_object, chunk_size=1024):
    while True:
        data = file_object.read(chunk_size)
        if not data:
            break
        yield data


with open(BIG_FILE_PATH, encoding="Latin-1") as file:
    with mp.Pool(N_PROCESSES) as pool:
        for piece in read_in_chunks(file, CHUNKSIZE):
            piece_list = piece.splitlines()
            piece_list_len = len(piece_list)
            item_delta = round(piece_list_len / N_PROCESSES)
            pool.map(work, [piece_list[i:i + item_delta] for i in range(0, piece_list_len, item_delta)])

这个带有列表迭代器列表的池与 Process 方法的 运行 时间完全相同。