Python: _pickle.PicklingError: Can't pickle <function <lambda>>

Python: _pickle.PicklingError: Can't pickle <function <lambda>>

我是 运行 Python 3.9.1

注意:我知道有类似标题的问题。但这些问题嵌入在复杂的代码中,使得问题难以理解。这是问题的 bare-bones 实现,我认为其他人会发现它更容易理解。

编辑:我的代码中有 Pool(processes=64)。但是大多数其他人可能不得不根据他们的计算机上有多少内核来更改它。如果花费的时间太长,请将 listLen 更改为较小的数字

我正在尝试学习多处理以解决工作中的问题。我有一个数组列表,我需要用它来对数组进行成对比较。但是为了简单起见,我用简单的整数而不是数组和加法函数而不是调用一些复杂的比较函数来重新创建问题的要点。使用下面的代码,我 运行 陷入名义错误

import time
from multiprocessing import Pool
import itertools
import random

def add_nums(a, b):
    return(a + b)

if __name__ == "__main__":
    listLen = 1000
    
    # Create a list of random numbers to do pairwise additions of
    myList = [random.choice(range(1000)) for i in range(listLen)]
    # Create a list of all pairwise combinations of the indices of the list
    index_combns = [*itertools.combinations(range(len(myList)),2)]

    # Do the pairwise operation without multiprocessing
    start_time = time.time()
    sums_no_mp = [*map(lambda x: add_nums(myList[x[0]], myList[x[1]]), index_combns)]
    end_time = time.time() - start_time
    print(f"Process took {end_time} seconds with no MP")

    # Do the pairwise operations with multiprocessing
    start_time = time.time()
    pool = Pool(processes=64)
    sums_mp = pool.map(lambda x: add_nums(myList[x[0]], myList[x[1]]), index_combns)
    end_time = time.time() - start_time
    print(f"Process took {end_time} seconds with MP")

    pool.close()
    pool.join()

Python 无法 pickle lambda 函数。相反,您应该定义函数并传递函数名称。以下是您可以采用的方法:

import itertools
import random
import time
from multiprocessing import Pool


def add_nums(a, b):
    return a + b


def foo(x):
    return add_nums(x[0], x[1])


if __name__ == "__main__":
    listLen = 1000

    # Create a list of random numbers to do pairwise additions of
    myList = [random.choice(range(1000)) for i in range(listLen)]
    # Create a list of all pairwise combinations of the indices of the list
    index_combns = [
        (myList[i[0]], myList[i[1]])
        for i in itertools.combinations(range(len(myList)), 2)
    ]

    # Do the pairwise operation without multiprocessing
    start_time = time.time()
    sums_no_mp = [*map(foo, index_combns)]
    end_time = time.time() - start_time
    print(f"Process took {end_time} seconds with no MP")

    # Do the pairwise operations with multiprocessing
    start_time = time.time()
    pool = Pool(processes=64)
    sums_mp = pool.map(foo, index_combns)
    end_time = time.time() - start_time
    print(f"Process took {end_time} seconds with MP")

    pool.close()
    pool.join()

我修改了 index_combns 以从 myList 中提取值,因为 myList 将无法从 foo 访问并传入 [= 的多个副本13=] 将增加 space 脚本的复杂性。

运行 这会打印:

Process took 0.053926944732666016 seconds with no MP
Process took 0.4799039363861084 seconds with MP

我不太确定为什么(虽然通读 multiprocessing docs 可能会有答案),但是 python's multiprocessing 其中子进程传递某些东西。虽然我希望 lambda 被继承而不是通过 pickle-ing 传递,但我想这不是正在发生的事情。

根据评论中的讨论,考虑类似这样的方法:

import time
from multiprocessing import Pool
import itertools
import numpy as np
from multiprocessing import shared_memory

def add_mats(a, b):
    #time.sleep(0.00001)
    return (a + b)

# Helper for mp version
def add_mats_shared(shm_name, array_shape, array_dtype, i1, i2):
    shm = shared_memory.SharedMemory(name=shm_name)
    stacked = np.ndarray(array_shape, dtype=array_dtype, buffer=shm.buf)
    a = stacked[i1]
    b = stacked[i2]
    result = add_mats(a, b)
    shm.close()
    return result

if __name__ == "__main__":
    class Timer:
        def __init__(self):
            self.start = None
            self.stop  = None
            self.delta = None

        def __enter__(self):
            self.start = time.time()
            return self

        def __exit__(self, *exc_args):
            self.stop = time.time()
            self.delta = self.stop - self.start

    arrays = [np.random.rand(5,5) for _ in range(50)]
    index_combns = list(itertools.combinations(range(len(arrays)),2))

    # Helper for non-mp version
    def add_mats_pair(ij_pair):
        i, j = ij_pair
        a = arrays[i]
        b = arrays[j]
        return add_mats(a, b)

    with Timer() as t:
        # Do the pairwise operation without multiprocessing
        sums_no_mp = list(map(add_mats_pair, index_combns))

    print(f"Process took {t.delta} seconds with no MP")


    with Timer() as t:
        # Stack arrays and copy result into shared memory
        stacked = np.stack(arrays)
        shm = shared_memory.SharedMemory(create=True, size=stacked.nbytes)
        shm_arr = np.ndarray(stacked.shape, dtype=stacked.dtype, buffer=shm.buf)
        shm_arr[:] = stacked[:]

        with Pool(processes=32) as pool:
            processes = [pool.apply_async(add_mats_shared, (
                shm.name,
                stacked.shape,
                stacked.dtype,
                i,
                j,
            )) for (i,j) in index_combns]
            sums_mp = [p.get() for p in processes]

        shm.close()
        shm.unlink()

    print(f"Process took {t.delta} seconds with MP")

    for i in range(len(sums_no_mp)):
        assert (sums_no_mp[i] == sums_mp[i]).all()

    print("Results match.")

它使用multiprocessing.shared_memory共享单个numpy (N+1)维数组(而不是N[=38的列表=]维数组)在主机进程和子进程之间。

其他不同但无关紧要的事情:

  • Pool 用作上下文管理器,以防止必须显式关闭和加入它。
  • Timer 是一个简单的上下文管理器,用于计时代码块。
  • 部分数字随机调整
  • pool.map 替换为对 pool.apply_async
  • 的调用

pool.map 也可以,但是您希望在 .map 调用之前构建参数列表并将其解压缩到工作函数中,例如:

with Pool(processes=32) as pool:
    args = [(
        shm.name,
        stacked.shape,
        stacked.dtype,
        i,
        j,
    ) for (i,j) in index_combns]
    sums_mp = pool.map(add_mats_shared, args)

# and 

# Helper for mp version
def add_mats_shared(args):
    shm_name, array_shape, array_dtype, i1, i2 = args
    shm = shared_memory.SharedMemory(name=shm_name)
    ....

Q :
" ... trying to learn about multiprocessing in order to solve a problem at work. "

A :
要学习的最重要的经验
成本-of-( 过程 )-INSTANTIATION(s),
所有其他 add-on 间接费用
( 仍然绝不能忽略, 问题的规模越大 )
与这个巨大而主要的问题相比是细节。

之前的答案是read-through并且完全看懂了,这里有一个Live GUI-interactive simulator of how much we will have to pay to start using more than 1 stream of process-flow orchestration ( costs vary - lower for threads, larger for MPI-based distributed operations, highest for multiprocessing-processes, as used in Python Interpreter, where N-many copies of the main Python Interpreter process get first copied (RAM-allocated and O/S scheduler spawned - as 2022-Q2 still reports issues if less expensive backends try to avoid this cost, yet at problems with deadlocking on wrong-shared or ill-copied or forgotten to copy some already blocking MUTEX-es and similar internalities - so that even the full-copy is not always safe in 2022 - not having met them in person does not mean these do not still exist, as documented by countless professionals - a story关于一池鲨鱼是一个很好的起点 )

问题盘点:

a ) pickling lambdas (以及许多其他 SER/DES 阻滞剂)

很简单 - conda install dillimport dill as pickle 就足够了,因为 dill 可以,多年来腌制它们 - 归功于@MikeMcKearns 而你的代码没有需要重构普通 pickle.dumps() 调用接口的使用。因此,使用 pathos.multiprocess 默认在内部使用 dill,而多年来已知的 multiprocessing SER/DES 弱点得到了避免。

b ) 性能杀手

- multiprocessing.Pool.map() 在这里更像是一个 End-to-End performance anti-pattern - 成本...,如果我们开始不忽视它们,显示,有多少 CPU-clocks 和阻塞 physical-RAM-I/O这么多 process-instantiations ( 60+ ) 支付了转账,最终“占据”了几乎所有物理 CPU-cores,但因此几乎 zero-space 离开了 zero-space 确实高性能 numpy-native core-problem 的 multicore-computing(最终性能预计会得到提升,不是吗?)

- 只需移动 p-模拟器中的滑块小于 100%(没有 [SERIAL]-部分问题执行,这在理论上很好,但从来没有在实践中是可行的,即使程序启动是纯粹的-[SERIAL],通过设计)

-只需移动Overhead-模拟器中的滑块到纯零以上的任何值(表示产生 N[ 之一的相对 add-on 成本=81=]CPUcores 个进程,作为一个数字百分比的 r,相对于指令的此类 [PARALLEL] 部分编号 - 数学上“密集”的工作有许多这样的“有用”指令,并且可能,假设没有其他性能杀手跳出盒子,可能会花费一些合理数量的“add-on”成本,产生一定数量的并发或parallel-operations(实际数量仅取决于实际的成本经济,而不是有多少 CPU-cores 存在,我们的“愿望”或学术或什至更糟 copy/paste-“建议”)。相反,数学上的“浅层”工作几乎总是“加速”<< 1( immense slow-downs ),因为有几乎没有机会证明已知的 add-on 成本(支付于 process-instantiations,数据 SER/xfer/DES 移入(参数)和移回(结果))

-接下来将模拟器中的 Overhead-slider 移动到最右边的边缘 == 1。这说明了这种情况,当实际 process-spawning 开销 -(时间损失) - 成本仍然不超过所有 [=162] 的 <= 1 % =] 接下来的指令,将为工作的“有用”部分执行,将在此类生成的 process-instance 中计算。因此,即使是这样的 1:100 比例因子(做比丢失的 CPU-time 多 100 倍的“有用”工作,因为安排那么多副本并使 O/S-scheduler 在可用系统内协调并发执行而被烧毁 Virtual-Memory ) 已经在 Speedup-degradation 的进度图中显示了所有警告 - 只需稍微玩一下 Overhead-模拟器中的滑块,在接触其他滑块之前...

- 避免“共享”的罪过(如果性能是目标)- 同样,在多个 Python 解释器进程之间操作这种编排的成本,现在是独立的,需要额外的 add-on 成本,从来没有理由获得性能提升,因为占用共享资源(CPU-cores、physical-RAM-I/O 通道)的斗争只会破坏 CPU-core-cache re-use hit-rates、O/S-scheduler 操作过程 context-switches 所有这些进一步降级导致 End-to-End 性能(这是我们不想要的,对吗?)

c ) 提高性能

- 尊重关于 actual costs of any kind of computing operation
- avoid "shallow"-computing steps,
- maximise what gets so expensively into a set of distributed-processes (if a need remains so),
- avoid all overhead-adding operations (like adding local temporary variables, where inline operations permit to inplace store of partial results)
and
- try go into using the ultra-performant, cache-line friendly & optimised, native numpy-vectorised multicore & striding-tricks capabilities, not blocked by pre-overloaded CPU-cores by scheduling so many (~60) Python Interpreter process copies, each one trying to call numpy-code, thus not having any free cores to actually place such high-performance, cache-reuse-friendly vectorised computing onto (there we get-or-loose most of the performance, not in slow-running serial-iterators, not in spawning 60+ process-based full-copies of "__main__" Python Interpreter, before doing a single piece of the useful work on our great data, expensively RAM-allocated and physically copied 60+ times therein)

- refactoring of the real problem shall never go against a collected knowledge about performance 的事实,因为重复不起作用的事情不会带来有什么优势吗?
- 尊重您的物理平台限制,忽略它们会降低您的性能
- 基准、配置文件、重构
- 基准、配置文件、重构
- 基准测试、配置文件、重构
这里没有其他可用的魔杖

并且一旦已经在性能的前沿工作,请在生成 gc.disable() 之前设置 Python 翻译成 N-many 复制副本,而不是等待自发的 garbage-collections 追求终极性能