pandas 使用并行处理按列值拆分数据帧
pandas data frame splitting by column values using Parallel Processing
我有一个非常大的 pandas 数据框,我正在尝试按股票名称将其拆分为多个并将它们保存到 csv。
stock date time spread time_diff
VOD 01-01 9:05 0.01 0:07
VOD 01-01 9:12 0.03 0:52
VOD 01-01 10:04 0.02 0:11
VOD 01-01 10:15 0.01 0:10
BAT 01-01 10:25 0.03 0:39
BAT 01-01 11:04 0.02 22:00
BAT 01-02 9:04 0.02 0:05
BAT 01-01 10:15 0.01 0:10
BOA 01-01 10:25 0.03 0:39
BOA 01-01 11:04 0.02 22:00
BOA 01-02 9:04 0.02 0:05
我知道如何以传统方式做到这一点
def split_save(df):
ids = df['stock'].unique()
for id in ids:
df = df[df['stock']==id]
df.to_csv(f'{my_path}/{id}.csv')
但是,由于我有一个非常大的数据框和数千只股票,我想进行多处理以加速。
有什么想法吗? (稍后我可能还会尝试使用 pyspark。)
谢谢!
参与 I/O 我不希望数据帧的选择成为主要障碍。
到目前为止,我可以为您提供两种加速方案:
线程:只需在不同的线程或 ThreadPoolExecutor
中启动每只股票
def dump_csv(df, ticker):
df.groupby(ticker).to_csv(f'{my_path}/{ticker}.csv')
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(df, ticker):ticker for ticker in df['stock'].unique()}
for future in concurrent.futures.as_completed(futures):
print(f"Dumped ticker {futures[future]}")
(代码未经测试,改编自示例)
在 ZIP 文件中工作:对于存储很多文件,zip 存档是一个很好的选择,但它应该得到“reader”的支持。
为了完整起见:
with ZipFile('stocks.zip', 'w', compression=zipfile.ZIP_DEFLATED) as zf:
ids = df['stock'].unique()
for id in ids:
zf.writestr(f'{id}.csv', df.groupby(ticker).to_csv())
我怀疑 groupby
是阻碍你前进的原因,但对于写作,我们可以通过 multithreading
加快速度,如下所示:
from concurrent.futures import ThreadPoolExecutor
# Number of cores/threads your CPU has/that you want to use.
workers = 4
def save_group(grouped):
name, group = grouped
group.to_csv(f'{name}.csv')
with ThreadPoolExecutor(workers) as pool:
processed = pool.map(save_group, df.groupby('stock'))
我有一个非常大的 pandas 数据框,我正在尝试按股票名称将其拆分为多个并将它们保存到 csv。
stock date time spread time_diff
VOD 01-01 9:05 0.01 0:07
VOD 01-01 9:12 0.03 0:52
VOD 01-01 10:04 0.02 0:11
VOD 01-01 10:15 0.01 0:10
BAT 01-01 10:25 0.03 0:39
BAT 01-01 11:04 0.02 22:00
BAT 01-02 9:04 0.02 0:05
BAT 01-01 10:15 0.01 0:10
BOA 01-01 10:25 0.03 0:39
BOA 01-01 11:04 0.02 22:00
BOA 01-02 9:04 0.02 0:05
我知道如何以传统方式做到这一点
def split_save(df):
ids = df['stock'].unique()
for id in ids:
df = df[df['stock']==id]
df.to_csv(f'{my_path}/{id}.csv')
但是,由于我有一个非常大的数据框和数千只股票,我想进行多处理以加速。
有什么想法吗? (稍后我可能还会尝试使用 pyspark。)
谢谢!
参与 I/O 我不希望数据帧的选择成为主要障碍。
到目前为止,我可以为您提供两种加速方案:
线程:只需在不同的线程或 ThreadPoolExecutor
中启动每只股票def dump_csv(df, ticker):
df.groupby(ticker).to_csv(f'{my_path}/{ticker}.csv')
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(df, ticker):ticker for ticker in df['stock'].unique()}
for future in concurrent.futures.as_completed(futures):
print(f"Dumped ticker {futures[future]}")
(代码未经测试,改编自示例)
在 ZIP 文件中工作:对于存储很多文件,zip 存档是一个很好的选择,但它应该得到“reader”的支持。
为了完整起见:
with ZipFile('stocks.zip', 'w', compression=zipfile.ZIP_DEFLATED) as zf:
ids = df['stock'].unique()
for id in ids:
zf.writestr(f'{id}.csv', df.groupby(ticker).to_csv())
我怀疑 groupby
是阻碍你前进的原因,但对于写作,我们可以通过 multithreading
加快速度,如下所示:
from concurrent.futures import ThreadPoolExecutor
# Number of cores/threads your CPU has/that you want to use.
workers = 4
def save_group(grouped):
name, group = grouped
group.to_csv(f'{name}.csv')
with ThreadPoolExecutor(workers) as pool:
processed = pool.map(save_group, df.groupby('stock'))