pathos: parallel processing options - 有人能解释一下差异吗?

pathos: parallel processing options - Could someone explain the differences?

我正在尝试 运行 在 python 下(在 ubuntu 上)并行处理。

我开始使用 multiprocessing,对于简单的示例它运行良好。
然后出现了 pickle 错误,所以我转向了 pathos。我对不同的选项有点困惑,所以写了一个非常简单的基准测试代码。

import multiprocessing as mp
from pathos.multiprocessing import Pool as Pool1
from pathos.pools import ParallelPool as Pool2
from pathos.parallel import ParallelPool as Pool3
import time

def square(x):  
    # calculate the square of the value of x
    return x*x

if __name__ == '__main__':

    dataset = range(0,10000)

    start_time = time.time()
    for d in dataset:
        square(d)
    print('test with no cores: %s seconds' %(time.time() - start_time))

    nCores = 3
    print('number of cores used: %s' %(nCores))  


    start_time = time.time()

    p = mp.Pool(nCores)
    p.map(square, dataset)

    # Close
    p.close()
    p.join()

    print('test with multiprocessing: %s seconds' %(time.time() - start_time))


    start_time = time.time()

    p = Pool1(nCores)
    p.map(square, dataset)

    # Close
    p.close()
    p.join()

    print('test with pathos multiprocessing: %s seconds' %(time.time() - start_time))


    start_time = time.time()

    p = Pool2(nCores)
    p.map(square, dataset)

    # Close
    p.close()
    p.join()

    print('test with pathos pools: %s seconds' %(time.time() - start_time))


    start_time = time.time()

    p = Pool3()
    p.ncpus = nCores
    p.map(square, dataset)

    # Close
    p.close()
    p.join()

    print('test with pathos parallel: %s seconds' %(time.time() - start_time))

我知道了
- 0.001s 纯串行代码,无并行,
- 0.100s multiprocessing 选项,
- pathos.multiprocessing
时 0.100 秒 - 4.470s pathos.pools,
- AssertionError 错误 pathos.parallel

我从 http://trac.mystic.cacr.caltech.edu/project/pathos/browser/pathos/examples.html

复制了如何使用这些不同的选项

我知道对于这样一个简单的例子,并行处理比普通的串行代码要长。我不明白的是pathos的相对表现。

我查看了讨论,但不明白为什么 pathos.pools 这么长,以及为什么我会收到错误消息(不确定最后一个选项的性能如何)。

我还尝试了一个简单的平方函数,因此即使 pathos.multiprocessing 也比 multiprocessing

长得多

有人可以解释一下这些不同选项之间的区别吗?

此外,我运行远程计算机上的pathos.multiprocessing选项,运行ning centOS,性能大约差10倍multiprocessing

据租用电脑的公司说,它应该像家用电脑一样工作。我知道如果没有关于机器的更多详细信息,可能很难提供信息,但如果您对它的来源有任何想法,那将会有所帮助。

有人能解释一下区别吗?

让我们从一些共同点开始。

Python 解释器作为标准使用 GIL-stepped code-execution。这意味着,所有基于 thread 的池仍然会等待所有 code-execution 路径的 GIL-stepped 排序,因此 any 这样构造的尝试将不会享受的好处“理论上预期”。

Python 解释器可以使用其他基于 process 的实例来加载多个进程,每个进程都有自己的 GIL-lock,形成多个池,并发 code-execution 条路径。

管理完这个校长 dis-ambiguation,接下来 performance-related 问题开始出现。最负责任的做法是benchmark,benchmark,benchmark。这里也不例外。


为什么要花这么多时间在这里(where)?

主要(常量)部分主要是 [TIME]-process-instantiation 的域成本。在这里,python 解释器的完整副本,包括所有变量,所有 memory-maps,实际上必须首先创建调用 python 解释器的完整 state-full-copy 并将其放置到操作系统 process-scheduler table,在任何进一步(工作的有用部分)计算 "inside" 这样成功实例化的 sub-process 可以发生之前。如果您的有效负载功能立即从那里 returns,创建了一个 x*x,您的代码似乎已经燃烧了一些 CPU-instructions ] 并且您在 return 中花费的远远多于收到的。 经济成本 对你不利,因为所有 process-instantiation 加上 process-termination 成本远高于几个 CPU-CLOCK 价格变动。

这实际需要多长时间?
您可以对此进行基准测试(如提议的 here,在提议的 Test-Case-A 中。如果 Stopwatch()-ed [us] 决定,你开始更多地依赖事实而不是任何类型的 wannabe-guru 或营销类型的建议。这很公平,不是吗?)。


Test-Case-A 基准 process-instantiation 成本 [已测量]。
下一步是什么?

下一个最危险的(变量大小)部分主要是[SPACE]域成本,但也有[TIME] 领域的影响,如果 [SPACE] - 分配成本开始超出小规模规模。

这种 add-on 开销成本与任何需要传递 "large" 大小的参数有关,从 "main"-python 解释器到每个(分布式)sub-process 个实例。

这需要多长时间?
再次,基准,基准,基准。应对此进行基准测试(如提议的 here,如果扩展那里提议的 Test-Case-C 并替换 aNeverConsumedPAR 参数确实有一些 "fat" 数据块,无论是 numpy.ndarray() 还是其他类型,都承载着一些巨大的数据memory-footprint.)

这样一来,真正的 hardware-related + O/S-related + python-related data-flow 成本开始变得可见,并可以在 **[us]**。这对 ol' 黑客来说并不是什么新鲜事,然而,那些从未遇到过 HDD-disk-write 次可能会增长并阻止其他处理 很多秒或分钟 的人很难相信,如果不通过自己的基准测试触及 data-flow 的实际成本。因此,请毫不犹豫地 将基准 Test-Case-C 扩展到确实很大的 memory-footprints 以闻到烟味 ...


最后但同样重要的是,re-formulated 阿姆达尔定律会告诉...

鉴于计算部分和所有 overhead-part(s) 都很好理解并行化某些计算的尝试,图片开始变得完整:

overhead-strict and resources-aware Amdahl's Law re-formulation显示:

                           1                         
S =  ______________________________________________ ;  where         s,
                    /                     \                    ( 1 - s ),
                   |  ( 1 - s )            |                       pSO,
     s  + pSO + max|  _________ , atomicP  |  + pTO                pTO,
                   |      N                |                         N
                    \                     /           have been defined in
                                                      just an Overhead-strict Law
and
atomicP := is a further indivisible duration of an atomic-process-block

由此产生的加速 S 总是遭受 高开销成本 pSO + pTO 与当 N 不允许进一步帮助时相同,因为 [=28 的值足够高=].

在所有这些情况下,最终加速 S 可能很容易 低于 << 1.0,是的,嗯在纯 [SERIAL] code-execution 路径调度下(再次,已经对 pSO 的实际成本进行了基准测试和 pTO(为此 Test-Case-A + Test-Case-C(extended)被示意性地提出)有机会推导出所需的最低合理 computing-payload 以保持在加速的神秘水平之上 >= 1.0

我是 pathos 的作者。对困惑感到抱歉。您正在处理新旧编程接口的混合。

"new"(建议)接口是使用pathos.pools。旧界面链接到相同的对象,所以这实际上是两种到达同一事物的方法。

multiprocess.Poolmultiprocessing.Pool 的分支,唯一的区别是 multiprocessing 使用 picklemultiprocess 使用 dill。所以,我希望在大多数简单情况下速度是相同的。

上述池也可以在 pathos.pools._ProcessPool 找到。 pathos 提供了一个围绕几种类型的池的小型包装器,具有不同的后端,提供了扩展功能。 pathos-wrapped 池是 pathos.pools.ProcessPool(旧接口在 pathos.multiprocessing.Pool 提供)。

首选接口是 pathos.pools.ProcessPool

还有 ParallelPool,它使用不同的后端——它使用 ppft 而不是 multiprocessppft 是 "parallel python",它通过 subprocess 生成 python 进程并传递源代码(使用 dill.source 而不是序列化对象)——它用于分布式计算,或者当通过源代码传递时是一个更好的选择。

因此,pathos.pools.ParallelPool 是首选接口,而 pathos.parallel.ParallelPool(以及 pathos 中的一些其他类似参考)由于遗留原因而流连忘返——但它们是相同的下面的物体。

总结:

>>> import multiprocessing as mp
>>> mp.Pool()
<multiprocessing.pool.Pool object at 0x10fa6b6d0>
>>> import multiprocess as mp
>>> mp.Pool()
<multiprocess.pool.Pool object at 0x11000c910>
>>> import pathos as pa
>>> pa.pools._ProcessPool()
<multiprocess.pool.Pool object at 0x11008b0d0>
>>> pa.multiprocessing.Pool()
<multiprocess.pool.Pool object at 0x11008bb10>
>>> pa.pools.ProcessPool()
<pool ProcessPool(ncpus=4)>
>>> pa.pools.ParallelPool()
<pool ParallelPool(ncpus=*, servers=None)>

您可以看到 ParallelPoolservers... 因此适用于分布式计算。

唯一剩下的问题是为什么 AssertionError?那是因为 pathos 添加的包装器保留了一个池对象以供重用。因此,当您第二次调用 ParallelPool 时,您调用的是一个封闭池。您需要 restart 池才能再次使用它。

>>> f = lambda x:x
>>> p = pa.pools.ParallelPool()
>>> p.map(f, [1,2,3])
[1, 2, 3]
>>> p.close()
>>> p.join()
>>> p.restart()  # throws AssertionError w/o this
>>> p.map(f, [1,2,3])
[1, 2, 3]
>>> p.close()
>>> p.join()
>>> p.clear()  # destroy the saved pool

ProcessPoolParallelPool 具有相同的界面,关于重新启动和清除保存的实例。