如何将 class 方法并行应用于 pandas 数据帧的块?

How to apply a class method to chunks of a pandas dataframe in parallel?

我有一个数据框 df,我想将其分成更小的块 按列 以便我可以对每个块应用 class 方法.该方法似乎工作正常,但在大约 10 次迭代后我不断得到 CustomizablePickler(buffer, self._reducers).dump(obj), AttributeError: Can't pickle local object 'delayed.<locals>.delayed_function',所以我不确定哪里出了问题。

到目前为止我有什么

from typing import Tuple

import pandas as pd
import numpy as np


class MyClass:
    def __init__(self, df: pd.DataFrame):
        self.df = df
    ...
    def run(self) -> Tuple[np.ndarray, np.ndarray]:
        <operate on self.df>


if __name__ == "__main__":
    from joblib import Parallel, delayed
    df = pd.read_csv("data.csv")

    n_cpu = 6
    chunk = df.shape[1] // n_cpu
    # create chunk column indicies
    lower_idx = np.array([0, chunk, 2 * chunk, 3 * chunk, 4 * chunk, 5 * chunk])
    upper_idx = lower_idx + chunk
    upper_idx[-1] = df.shape[1]
    res = Parallel(n_jobs=n_cpu, backend="multiprocessing")(
        delayed(MyClass(df.iloc[:, i: j]).run()) for i, j in zip(lower_idx, upper_idx)
    )

数据帧 df 很小,可以很容易地放入内存(只有 54 Mb),但是 run() 中的操作需要很长时间,所以我想将这些操作并行化一次 df 列的子集。

我是不是做错了什么?有没有更简单的方法来做到这一点?我不需要使用 joblib 包,但如果可能,我想保持 MyClass 不变。

使用代理功能 运行 您的 class 实例:

import pandas as pd
import numpy as np
import multiprocessing as mp
from typing import Tuple

class MyClass:
    def __init__(self, df: pd.DataFrame):
        self.df = df

    def run(self) -> Tuple[np.ndarray, np.ndarray]:
        return (df.min(1).values, df.max(1).values)

def proxy(df):
    c = MyClass(df)
    return c.run()

if __name__ == '__main__':
    np.random.seed(2022)
    df = pd.DataFrame(np.random.randint(1, 1000, (10, 100)))

    n_cpu = 6
    chunks = np.array_split(df, n_cpu, axis=1)

    with mp.Pool(n_cpu) as pool:
        data = pool.map(proxy, chunks)

输出:

>>> data
[(array([12, 15,  1,  4, 13,  3,  9,  2, 41, 16]),
  array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995])),
 (array([12, 15,  1,  4, 13,  3,  9,  2, 41, 16]),
  array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995])),
 (array([12, 15,  1,  4, 13,  3,  9,  2, 41, 16]),
  array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995])),
 (array([12, 15,  1,  4, 13,  3,  9,  2, 41, 16]),
  array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995])),
 (array([12, 15,  1,  4, 13,  3,  9,  2, 41, 16]),
  array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995])),
 (array([12, 15,  1,  4, 13,  3,  9,  2, 41, 16]),
  array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995]))]

更新

你能用以下代码替换 with 块吗:

    p = mp.get_context('fork').Pool(8)
    data = p.map(proxy, chunks)