CPU 绑定任务 - 多处理方法性能比同步方法差 - 为什么?

CPU Bound Task - Multiprocessing Approach Performance Way Worse Than Synchronous Approach -Why?

我刚开始使用异步编程,我有一个关于 CPU 多处理绑定任务的问题。简而言之,为什么多处理产生的时间性能比同步方法差得多?我的异步版本代码有什么问题吗?欢迎提出任何建议!

1:任务描述

我想使用 the Google's Ngram datasets 中的一个作为输入,并创建一个包含每个单词和相应单词计数的巨大词典。

数据集中的每条记录如下所示:

"corpus\tyear\tWord_Count\t\Number_of_Book_Corpus_Showup"

示例:

“A'Aang_NOUN\t1879\t45\t5\n”

2:硬件信息: 英特尔酷睿 i5-5300U CPU @ 2.30 GHz 8GB 内存

3:同步版本 - 耗时 170.6280147 秒

import time

with open(r".\googlebooks-eng-all-1gram-20120701-a.gz",encoding='utf-8') as file:
    start = time.perf_counter()

    all_input = file.readlines()
    word_count_dict = {}

    for line in all_input:
        temp = line.replace('\n','').split('\t')
        if temp[0] not in word_count_dict.keys():
            word_count_dict[temp[0]] = temp[2]
        else:
            word_count_dict[temp[0]] += temp[2]

    print(f'total time used for sync version as {time.perf_counter() - start}')

4:异步版本 - 耗时 611.5669237 秒

import asyncio
from concurrent.futures import ProcessPoolExecutor
import functools
import time

def data_spliting(input_data,chunk_size): # todo see next part for how to set chunk size
    for x in range(0,len(input_data),chunk_size):
        yield input_data[x:x+chunk_size]

def single_chunk_dict(chunk):
    result = {}
    for line in chunk:
        temp = line.replace('\n','').split('\t')
        if temp[0] not in result.keys():
            result[temp[0]] = temp[2]
        else:
            result[temp[0]] += temp[2]

    return result

def word_reduce(first_dict,second_dict):
    result = {}
    for map in [first_dict,second_dict]:
        for key, value in map.items():
            if key not in result.keys():
                result[key] = value
            else:
                result[key] += value
    return result

async def main():
    with open(r".\googlebooks-eng-all-1gram-20120701-a.gz",encoding='utf-8') as file:
        test = file.readlines()

    with ProcessPoolExecutor() as process_pool:
        loop = asyncio.get_running_loop()
        tasks = [functools.partial(single_chunk_dict,ch) for ch in data_spliting(test,21654626)]
        result = [loop.run_in_executor(process_pool,x) for x in tasks]
        result = await asyncio.gather(*result)

        output = functools.reduce(lambda x,y: word_reduce(x,y),result)
        print(f'output total keys = {len(output.keys())}')


if __name__ == '__main__':
    start = time.perf_counter()
    asyncio.run(main())
    print(f'Total Time for Completion as {time.perf_counter() - start}')

关于彼得斯先生回答的更多问题。

我不敢相信蒂姆·彼得斯会回答我的问题。这太酷了!

1: 在我正在看的书中,作者使用这个任务来演示MapReduce。我想知道 MapReduce 是否适合多处理。

2:在书中,作者建议将ProcessPoolExecutor与asyncio事件循环挂钩,让我们使用API函数,如gather()和as_complete()。将 ProcessPoolExecutor 与 asyncio 混合使用是一种好习惯吗?还是我应该坚持使用 ProcessPoolExecutor 中的 map() 函数?

3:“粗粒度并行性的“理想”候选者对需要在进程之间传输的每个字节进行大量计算,根本不需要太多进程间通信。”

“进程间传递”和“进程间通信”是什么意思?

你的代码中有很多地方我不明白。所以我只会给你有效的代码 ;-)

  • 我对您的代码如何 运行 感到困惑。 .gz 文件是压缩的二进制数据(gzip 压缩)。您应该需要用Python的gzip.open()打开它。照原样,我希望它会因编码异常而死,就像我尝试它时那样。

  • temp[2] 不是整数。这是一个字符串。您不是在此处添加整数,而是使用 + 连接字符串。 int()需要先申请。

  • 我不相信我以前见过 asyncioconcurrent.futures 混在一起。没有必要。 asyncio针对单线程中的fine-grainedpseudo-concurrency; concurrent.futures 旨在 coarse-grained 真正的跨进程并发。你想要后者。没有 asyncio.

    代码更容易、更简单、更快
  • 虽然 concurrent.futures 很好,但我已经足够大了,所以我投入了大量资金来首先学习较旧的 multiprocessing,所以我在这里使用它。

  • 这些 ngram 文件足够大,无论 运行 是串行版本还是并行版本,我都可以“分块”读取。

  • collections.Counter 比普通字典更适合您的任务。

  • 虽然我的机器比你快,但上面提到的一些变化与我更快的时间有很大关系。

  • 我确实使用 3 个工作进程获得了加速,但实际上,所有 3 个都几乎没有被使用过。每 的输入进行的计算很少,我预计它 memory-bound 多于 CPU-bound。所有进程也在争夺缓存space,缓存未命中的代价很高。 coarse-grained 并行性的“理想”候选者对需要在进程之间传输的每个字节进行大量计算,根本不需要太多 inter-process 通信。这个问题都不是真的。

from time import perf_counter as now
import gzip
from collections import Counter

PATH = r".\googlebooks-eng-all-1gram-20120701-a.gz"

def chunked_read(f, byte_limit=10**9):
    while True:
        lines = f.readlines(byte_limit)
        if lines:
            print("returning", format(len(lines), ','), "lines")
            yield lines
        else:
            break

def crunch(lines):
    c = Counter()
    for line in lines:
        temp = line.split('\t')
        c[temp[0]] += int(temp[2])
    return c

def show_counter(tag, c):
    print(tag, f"{len(c) = :,} {c.total() = :,}")

if __name__ == "__main__":
    if 1:
        start = now()
        word_count_dict = Counter()
        with gzip.open(PATH, "rt", encoding='utf-8') as f:
            for lines in chunked_read(f):
                word_count_dict += crunch(lines)

        print(f'total time used for sync version was {now() - start}')
        show_counter("word_count_dict", word_count_dict)
        del lines

    if 1:
        import multiprocessing as mp
        MAXWORKERS = 3
        start = now()
        d = Counter()
        with gzip.open(PATH, "rt", encoding='utf-8') as f, \
             mp.Pool(MAXWORKERS) as p:
            for r in p.imap_unordered(crunch,
                         chunked_read(f, 100_000_000)):
                d += r
        print(f'total time used for mp version was {now() - start}')
        show_counter("d", d)

        assert word_count_dict == d  # verify same result either way

和一个运行的输出:

returning 48,118,459 lines
returning 38,500,046 lines
total time used for sync version was 73.44404479999866
word_count_dict len(c) = 1,440,378 c.total() = 88,179,952,324
returning 4,813,692 lines
returning 4,816,691 lines
returning 4,807,902 lines
returning 4,814,039 lines
returning 4,806,524 lines
returning 4,812,883 lines
returning 4,808,835 lines
returning 4,812,116 lines
returning 4,811,212 lines
returning 4,814,568 lines
returning 4,811,115 lines
returning 4,811,647 lines
returning 4,818,276 lines
returning 4,813,439 lines
returning 4,819,090 lines
returning 4,813,714 lines
returning 4,815,252 lines
returning 4,797,510 lines
total time used for mp version was 59.9362641000007
d len(c) = 1,440,378 c.total() = 88,179,952,324

编辑:使用 concurrent.futures 代替

这里使用 concurrent.futures 而不是 multiprocessing 确实既不简单也不难,尽管 mp 有很多花哨的东西,可能需要一段时间才能意识到这一点。时间也并不重要:在幕后,时间绝大部分被 OS-level inter-process 管道或套接字消耗。您使用哪个 higher-level API 来达到目的与速度无关紧要。只需像这样替换 mp 部分:


        import concurrent.futures as cf
        ...
        with gzip.open(PATH, "rt", encoding='utf-8') as f, \
             cf.ProcessPoolExecutor(max_workers=MAXWORKERS) as ex:
            for r in ex.map(crunch, chunked_read(f, 100_000_000)):
                d += r

的确,代码本质上是一样的。

问答

“MapReduce”是一个具有多种含义的艺术术语,主要指一种思考安排并行计算的方式,尤其是指该模型的 Apache Hadoop 实现。标准 (python.org) Python 发行版不直接支持它。

在我向您展示的代码中,“地图”部分由名称中带有“地图”的函数拼写而成(multiprocessingimap_unordered()concurrent.futuremap())。 “reduce”部分被简单地拼写为“+=”——Counters 直接支持将其中两个组合起来的方式,这比任何间接方式都(或应该)更明显、更有效。

如果你想认真研究 MapReduce,我建议下载专门针对该框架的精心制作的软件包。在极端情况下(如 Hadoop),他们会想要实现自己的文件系统。

这里也不需要 asyncio 中的“完整”部分。 Poolimap_unordered()(我的代码使用了它)直接实现了这一点,并在完成时返回结果。虽然我没有在这里展示它,但 concurrent.futures 提供了一个 as_completed() 功能,其功能大致相同。同样,由于包直接实现了这些,因此它们比任何间接方式都更加明显和高效。

我的目的是不鼓励您使用 asyncio 除非确实需要(这里不是)。可以肯定的是,大多数 Python 程序员不知道 asyncio 的作用,因此无法使用它来跟踪、调试或扩展代码。 multiprocessingconcurrent.futures 本身就具有足够的挑战性 ;-)

关于IPC(inter-process通信),你的OS保证两个进程有不同的地址space。 他们之间没有共享。 Python 进程和其他进程一样。例如,如果您在 multi-process Python 程序中有一个字符串对象,它只对一个进程可见。如果你想让另一个进程看到它,你需要 IPC 将对象“发送”到另一个进程。这远非免费。在幕后,字符串被 pickle 模块转换为字节序列,该字节序列被推入 OS-level 套接字或管道 inter-process 连接,并且接收进程有获取该序列并将其解开以重建原始字符串对象的副本。

在这个问题中,一个巨大的瓶颈是输入数据文件。它是压缩的二进制数据,因此无法告诉不同的进程在文件中的偏移量,因此它们都可以 start 在自己的部分上。相反,一个进程必须解压缩它并使用 IPC 将行发送(再次,物理副本!)到其他进程。解压缩后,这将关闭 2 GB 的原始数据,所有这些原始数据都将通过多层软件(pickling、管道或套接字的两端、unpickling)一次处理一个字节来发送给处理者。幕后有很多机器来支持这一点,none 其中免费工作,包括使用辅助线程和 inter-process 锁的方法,以确保所有数据保持正常并且不会“冻结” " 发送或接收进程。

什么是“理想”应用程序?例如,您有一个包含一百万个大整数的列表,您要将其分解(分解为质数)。一个 200 位的整数只有大约 200 个字节长。跨进程发送不是免费的,但与考虑它可能需要的年数相比成本微不足道;-) 如果你想为每个整数加 1,那么跨进程发送整数的成本是 远远比加1的成本高

在手头的例子中,实际完成的工作包括将一条短线分成几段(在制表符上拆分),将表示小整数的字符串转换为 int,然后将该 int 添加到 运行ning total 在由另一段字符串索引的字典中。不是免费的,但也不贵。除非输入文件被重新处理以便多个进程可以独立地从中读取(例如,如果数据以 100 个不同的文件开始存储 - 那么进程就不需要通信 完全 除了在最后合并他们的命令)。