如何将 class 方法并行应用于 pandas 数据帧的块?
How to apply a class method to chunks of a pandas dataframe in parallel?
我有一个数据框 df
,我想将其分成更小的块 按列 以便我可以对每个块应用 class 方法.该方法似乎工作正常,但在大约 10 次迭代后我不断得到 CustomizablePickler(buffer, self._reducers).dump(obj), AttributeError: Can't pickle local object 'delayed.<locals>.delayed_function'
,所以我不确定哪里出了问题。
到目前为止我有什么
from typing import Tuple
import pandas as pd
import numpy as np
class MyClass:
def __init__(self, df: pd.DataFrame):
self.df = df
...
def run(self) -> Tuple[np.ndarray, np.ndarray]:
<operate on self.df>
if __name__ == "__main__":
from joblib import Parallel, delayed
df = pd.read_csv("data.csv")
n_cpu = 6
chunk = df.shape[1] // n_cpu
# create chunk column indicies
lower_idx = np.array([0, chunk, 2 * chunk, 3 * chunk, 4 * chunk, 5 * chunk])
upper_idx = lower_idx + chunk
upper_idx[-1] = df.shape[1]
res = Parallel(n_jobs=n_cpu, backend="multiprocessing")(
delayed(MyClass(df.iloc[:, i: j]).run()) for i, j in zip(lower_idx, upper_idx)
)
数据帧 df
很小,可以很容易地放入内存(只有 54 Mb),但是 run()
中的操作需要很长时间,所以我想将这些操作并行化一次 df
列的子集。
我是不是做错了什么?有没有更简单的方法来做到这一点?我不需要使用 joblib
包,但如果可能,我想保持 MyClass
不变。
使用代理功能 运行 您的 class 实例:
import pandas as pd
import numpy as np
import multiprocessing as mp
from typing import Tuple
class MyClass:
def __init__(self, df: pd.DataFrame):
self.df = df
def run(self) -> Tuple[np.ndarray, np.ndarray]:
return (df.min(1).values, df.max(1).values)
def proxy(df):
c = MyClass(df)
return c.run()
if __name__ == '__main__':
np.random.seed(2022)
df = pd.DataFrame(np.random.randint(1, 1000, (10, 100)))
n_cpu = 6
chunks = np.array_split(df, n_cpu, axis=1)
with mp.Pool(n_cpu) as pool:
data = pool.map(proxy, chunks)
输出:
>>> data
[(array([12, 15, 1, 4, 13, 3, 9, 2, 41, 16]),
array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995])),
(array([12, 15, 1, 4, 13, 3, 9, 2, 41, 16]),
array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995])),
(array([12, 15, 1, 4, 13, 3, 9, 2, 41, 16]),
array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995])),
(array([12, 15, 1, 4, 13, 3, 9, 2, 41, 16]),
array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995])),
(array([12, 15, 1, 4, 13, 3, 9, 2, 41, 16]),
array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995])),
(array([12, 15, 1, 4, 13, 3, 9, 2, 41, 16]),
array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995]))]
更新
你能用以下代码替换 with
块吗:
p = mp.get_context('fork').Pool(8)
data = p.map(proxy, chunks)
我有一个数据框 df
,我想将其分成更小的块 按列 以便我可以对每个块应用 class 方法.该方法似乎工作正常,但在大约 10 次迭代后我不断得到 CustomizablePickler(buffer, self._reducers).dump(obj), AttributeError: Can't pickle local object 'delayed.<locals>.delayed_function'
,所以我不确定哪里出了问题。
到目前为止我有什么
from typing import Tuple
import pandas as pd
import numpy as np
class MyClass:
def __init__(self, df: pd.DataFrame):
self.df = df
...
def run(self) -> Tuple[np.ndarray, np.ndarray]:
<operate on self.df>
if __name__ == "__main__":
from joblib import Parallel, delayed
df = pd.read_csv("data.csv")
n_cpu = 6
chunk = df.shape[1] // n_cpu
# create chunk column indicies
lower_idx = np.array([0, chunk, 2 * chunk, 3 * chunk, 4 * chunk, 5 * chunk])
upper_idx = lower_idx + chunk
upper_idx[-1] = df.shape[1]
res = Parallel(n_jobs=n_cpu, backend="multiprocessing")(
delayed(MyClass(df.iloc[:, i: j]).run()) for i, j in zip(lower_idx, upper_idx)
)
数据帧 df
很小,可以很容易地放入内存(只有 54 Mb),但是 run()
中的操作需要很长时间,所以我想将这些操作并行化一次 df
列的子集。
我是不是做错了什么?有没有更简单的方法来做到这一点?我不需要使用 joblib
包,但如果可能,我想保持 MyClass
不变。
使用代理功能 运行 您的 class 实例:
import pandas as pd
import numpy as np
import multiprocessing as mp
from typing import Tuple
class MyClass:
def __init__(self, df: pd.DataFrame):
self.df = df
def run(self) -> Tuple[np.ndarray, np.ndarray]:
return (df.min(1).values, df.max(1).values)
def proxy(df):
c = MyClass(df)
return c.run()
if __name__ == '__main__':
np.random.seed(2022)
df = pd.DataFrame(np.random.randint(1, 1000, (10, 100)))
n_cpu = 6
chunks = np.array_split(df, n_cpu, axis=1)
with mp.Pool(n_cpu) as pool:
data = pool.map(proxy, chunks)
输出:
>>> data
[(array([12, 15, 1, 4, 13, 3, 9, 2, 41, 16]),
array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995])),
(array([12, 15, 1, 4, 13, 3, 9, 2, 41, 16]),
array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995])),
(array([12, 15, 1, 4, 13, 3, 9, 2, 41, 16]),
array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995])),
(array([12, 15, 1, 4, 13, 3, 9, 2, 41, 16]),
array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995])),
(array([12, 15, 1, 4, 13, 3, 9, 2, 41, 16]),
array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995])),
(array([12, 15, 1, 4, 13, 3, 9, 2, 41, 16]),
array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995]))]
更新
你能用以下代码替换 with
块吗:
p = mp.get_context('fork').Pool(8)
data = p.map(proxy, chunks)