大型 Pandas 数据帧并行处理
Large Pandas Dataframe parallel processing
我正在访问一个非常大的 Pandas 数据框作为全局变量。该变量通过 joblib 并行访问。
例如
df = db.query("select id, a_lot_of_data from table")
def process(id):
temp_df = df.loc[id]
temp_df.apply(another_function)
Parallel(n_jobs=8)(delayed(process)(id) for id in df['id'].to_list())
以这种方式访问原始df似乎是跨进程复制数据。这是出乎意料的,因为原始 df 没有在任何子进程中被更改? (或者是?)
Python 多处理通常使用单独的进程完成,如您所述,这意味着进程不共享内存。如果您可以使用 np.memmap
进行工作,那么有一个潜在的解决方法,正如 joblib 文档中提到的那样,尽管转储到磁盘显然会增加其自身的一些开销:https://pythonhosted.org/joblib/parallel.html#working-with-numerical-data-in-shared-memory-memmaping
需要为joblib 创建的每个进程对整个DataFrame 进行pickle 和unpickled。实际上,这非常慢,并且还需要每个内存的许多倍。
一种解决方案是使用 table 格式将数据存储在 HDF (df.to_hdf
) 中。然后,您可以使用 select
到 select 数据子集进行进一步处理。实际上,这对于交互式使用来说太慢了。它也非常复杂,您的工作人员需要存储他们的工作,以便在最后一步进行合并。
另一种方法是使用 target='parallel'
探索 numba.vectorize
。这将需要使用 NumPy 数组而不是 Pandas 对象,因此它也有一些复杂性成本。
在漫长的运行中,dask有望将并行执行带到Pandas,但这不是很快就能实现的。
我正在访问一个非常大的 Pandas 数据框作为全局变量。该变量通过 joblib 并行访问。
例如
df = db.query("select id, a_lot_of_data from table")
def process(id):
temp_df = df.loc[id]
temp_df.apply(another_function)
Parallel(n_jobs=8)(delayed(process)(id) for id in df['id'].to_list())
以这种方式访问原始df似乎是跨进程复制数据。这是出乎意料的,因为原始 df 没有在任何子进程中被更改? (或者是?)
Python 多处理通常使用单独的进程完成,如您所述,这意味着进程不共享内存。如果您可以使用 np.memmap
进行工作,那么有一个潜在的解决方法,正如 joblib 文档中提到的那样,尽管转储到磁盘显然会增加其自身的一些开销:https://pythonhosted.org/joblib/parallel.html#working-with-numerical-data-in-shared-memory-memmaping
需要为joblib 创建的每个进程对整个DataFrame 进行pickle 和unpickled。实际上,这非常慢,并且还需要每个内存的许多倍。
一种解决方案是使用 table 格式将数据存储在 HDF (df.to_hdf
) 中。然后,您可以使用 select
到 select 数据子集进行进一步处理。实际上,这对于交互式使用来说太慢了。它也非常复杂,您的工作人员需要存储他们的工作,以便在最后一步进行合并。
另一种方法是使用 target='parallel'
探索 numba.vectorize
。这将需要使用 NumPy 数组而不是 Pandas 对象,因此它也有一些复杂性成本。
在漫长的运行中,dask有望将并行执行带到Pandas,但这不是很快就能实现的。