Python - 使用 pandas 多处理多个大文件
Python - multiprocessing multiple large size files using pandas
我有一个 y.csv
文件。文件大小为 10 MB,其中包含来自 Jan 2020 to May 2020
.
的数据
我每个月都有一个单独的文件。例如data-2020-01.csv
。它包含详细的数据。每个月文件的文件大小约为1 GB
.
我按月份拆分 y.csv
,然后通过加载相关的月份文件来处理数据。当我花费大量时间时,这个过程花费的时间太长。例如24个月。
我想更快地处理数据。我可以访问具有 32 vCPU
和 128 GB
内存的 AWS m6i.8xlarge
实例。
我是多处理新手。那么有人可以指导我吗?
这是我当前的代码。
import pandas as pd
periods = [(2020, 1), (2020, 2), (2020, 3), (2020, 4), (2020, 5)]
y = pd.read_csv("y.csv", index_col=0, parse_dates=True).fillna(0) # Filesize: ~10 MB
def process(_month_df, _index):
idx = _month_df.index[_month_df.index.get_loc(_index, method='nearest')]
for _, value in _month_df.loc[idx:].itertuples():
up_delta = 200
down_delta = 200
up_value = value + up_delta
down_value = value - down_delta
if value > up_value:
y.loc[_index, "result"] = 1
return
if value < down_value:
y.loc[_index, "result"] = 0
return
for x in periods:
filename = "data-" + str(x[0]) + "-" + str(x[1]).zfill(2) # data-2020-01
filtered_y = y[(y.index.month == x[1]) & (y.index.year == x[0])] # Only get the current month records
month_df = pd.read_csv(f'{filename}.csv', index_col=0, parse_dates=True) # Filesize: ~1 GB (data-2020-01.csv)
for index, row in filtered_y.iterrows():
process(month_df, index)
正如在多个 pandas/threading 问题中评论的那样,CSV 文件是 IO 绑定的,您可以从使用 ThreadPoolExecutor.
中获得一些好处
同时,如果您要执行聚合操作,请考虑在处理器的 内部 执行 read_csv
并改用 ProcessPoolExecutor .
如果您要在多进程之间传递大量数据,您还需要适当的内存共享方法。
但是我看到iterrows
和itertuples
的用法,总的来说这两个指令让我眼睛流血。您确定不能以矢量化模式处理数据吗?
这个特定的部分我不确定它应该做什么,并且有 M 行会使它非常慢。
def process(_month_df, _index):
idx = _month_df.index[_month_df.index.get_loc(_index, method='nearest')]
for _, value in _month_df.loc[idx:].itertuples():
up_delta = 200
down_delta = 200
up_value = value + up_delta
down_value = value - down_delta
if value > up_value:
y.loc[_index, "result"] = 1
return
if value < down_value:
y.loc[_index, "result"] = 0
return
下面是一个矢量化代码,看它是上升还是下降,在哪一行
df=pd.DataFrame({'vals': np.random.random(int(10))*1000+5000}).astype('int64')
print(df.vals.values)
up_value = 6000
down_value = 3000
valsup = df.vals.values + 200*np.arange(df.shape[0])+200
valsdown = df.vals.values - 200*np.arange(df.shape[0])-200
#! argmax returns 0 if all false
# idx_up = np.argmax(valsup > up_value)
# idx_dwn= np.argmax(valsdown < down_value)
idx_up = np.argwhere(valsup > up_value)
idx_dwn= np.argwhere(valsdown < down_value)
idx_up = idx_up[0][0] if len(idx_up) else -1
idx_dwn = idx_dwn[0][0] if len(idx_dwn) else -1
if idx_up < 0 and idx_dwn<0:
print(f" Not up nor down")
if idx_up < idx_dwn or idx_dwn<0:
print(f" Result is positive, in position {idx_up}")
else:
print(f" Result is negative, in position {idx_dwn}")
为了完整起见,对 1000 个元素进行基准测试 itertuples()
和 argwhere
方法:
.itertuples()
:757µs
arange
+ argwhere
: 60µs
多线程池非常适合在线程之间共享 y
数据帧(避免使用共享内存的需要),但在 运行 处理越多 CPU-intensive 方面就不那么好了在平行下。多处理池非常适合进行 CPU-intensive 处理,但在不提供 y
数据帧的碎片内存表示的情况下跨进程共享数据就不是那么好。
我在这里重新安排了您的代码,以便我使用多线程池为每个周期创建 filtered_y
( 是 一个 CPU-intensive 操作,但是pandas 确实为某些操作释放了全局解释器锁——希望是这个)。然后我们只将 one-months 的数据传递给多处理池,而不是整个 y
数据帧,以使用工作函数 process_month
处理那个月。但是由于每个池进程都无法访问 y
数据帧,它只是 returns 需要用要替换的值更新的索引。
import pandas as pd
from multiprocessing.pool import Pool, ThreadPool
def process_month(period, filtered_y):
"""
returns a list of tuples consisting of (index, value) pairs
"""
filename = "data-" + str(period[0]) + "-" + str(period[1]).zfill(2) # data-2020-01
month_df = pd.read_csv(f'{filename}.csv', index_col=0, parse_dates=True) # Filesize: ~1 GB (data-2020-01.csv)
results = []
for index, row in filtered_y.iterrows():
idx = month_df.index[month_df.index.get_loc(index, method='nearest')]
for _, value in month_df.loc[idx:].itertuples():
up_delta = 200
down_delta = 200
up_value = value + up_delta
down_value = value - down_delta
if value > up_value:
results.append((index, 1))
break
if value < down_value:
results.append((index, 0))
break
return results
def process(period):
filtered_y = y[(y.index.month == period[1]) & (y.index.year == period[0])] # Only get the current month records
for index, value in multiprocessing_pool.apply(process_month, (period, filtered_y)):
y.loc[index, "result"] = value
def main():
global y, multiprocessing_pool
periods = [(2020, 1), (2020, 2), (2020, 3), (2020, 4), (2020, 5)]
y = pd.read_csv("y.csv", index_col=0, parse_dates=True).fillna(0) # Filesize: ~10 MB
with Pool() as multiprocessing_pool, ThreadPool(len(periods)) as thread_pool:
thread_pool.map(process, periods)
# Presumably y gets written out again as a CSV file here?
# Required for Windows:
if __name__ == '__main__':
main()
我有一个 y.csv
文件。文件大小为 10 MB,其中包含来自 Jan 2020 to May 2020
.
我每个月都有一个单独的文件。例如data-2020-01.csv
。它包含详细的数据。每个月文件的文件大小约为1 GB
.
我按月份拆分 y.csv
,然后通过加载相关的月份文件来处理数据。当我花费大量时间时,这个过程花费的时间太长。例如24个月。
我想更快地处理数据。我可以访问具有 32 vCPU
和 128 GB
内存的 AWS m6i.8xlarge
实例。
我是多处理新手。那么有人可以指导我吗?
这是我当前的代码。
import pandas as pd
periods = [(2020, 1), (2020, 2), (2020, 3), (2020, 4), (2020, 5)]
y = pd.read_csv("y.csv", index_col=0, parse_dates=True).fillna(0) # Filesize: ~10 MB
def process(_month_df, _index):
idx = _month_df.index[_month_df.index.get_loc(_index, method='nearest')]
for _, value in _month_df.loc[idx:].itertuples():
up_delta = 200
down_delta = 200
up_value = value + up_delta
down_value = value - down_delta
if value > up_value:
y.loc[_index, "result"] = 1
return
if value < down_value:
y.loc[_index, "result"] = 0
return
for x in periods:
filename = "data-" + str(x[0]) + "-" + str(x[1]).zfill(2) # data-2020-01
filtered_y = y[(y.index.month == x[1]) & (y.index.year == x[0])] # Only get the current month records
month_df = pd.read_csv(f'{filename}.csv', index_col=0, parse_dates=True) # Filesize: ~1 GB (data-2020-01.csv)
for index, row in filtered_y.iterrows():
process(month_df, index)
正如在多个 pandas/threading 问题中评论的那样,CSV 文件是 IO 绑定的,您可以从使用 ThreadPoolExecutor.
中获得一些好处同时,如果您要执行聚合操作,请考虑在处理器的 内部 执行 read_csv
并改用 ProcessPoolExecutor .
如果您要在多进程之间传递大量数据,您还需要适当的内存共享方法。
但是我看到iterrows
和itertuples
的用法,总的来说这两个指令让我眼睛流血。您确定不能以矢量化模式处理数据吗?
这个特定的部分我不确定它应该做什么,并且有 M 行会使它非常慢。
def process(_month_df, _index):
idx = _month_df.index[_month_df.index.get_loc(_index, method='nearest')]
for _, value in _month_df.loc[idx:].itertuples():
up_delta = 200
down_delta = 200
up_value = value + up_delta
down_value = value - down_delta
if value > up_value:
y.loc[_index, "result"] = 1
return
if value < down_value:
y.loc[_index, "result"] = 0
return
下面是一个矢量化代码,看它是上升还是下降,在哪一行
df=pd.DataFrame({'vals': np.random.random(int(10))*1000+5000}).astype('int64')
print(df.vals.values)
up_value = 6000
down_value = 3000
valsup = df.vals.values + 200*np.arange(df.shape[0])+200
valsdown = df.vals.values - 200*np.arange(df.shape[0])-200
#! argmax returns 0 if all false
# idx_up = np.argmax(valsup > up_value)
# idx_dwn= np.argmax(valsdown < down_value)
idx_up = np.argwhere(valsup > up_value)
idx_dwn= np.argwhere(valsdown < down_value)
idx_up = idx_up[0][0] if len(idx_up) else -1
idx_dwn = idx_dwn[0][0] if len(idx_dwn) else -1
if idx_up < 0 and idx_dwn<0:
print(f" Not up nor down")
if idx_up < idx_dwn or idx_dwn<0:
print(f" Result is positive, in position {idx_up}")
else:
print(f" Result is negative, in position {idx_dwn}")
为了完整起见,对 1000 个元素进行基准测试 itertuples()
和 argwhere
方法:
.itertuples()
:757µsarange
+argwhere
: 60µs
多线程池非常适合在线程之间共享 y
数据帧(避免使用共享内存的需要),但在 运行 处理越多 CPU-intensive 方面就不那么好了在平行下。多处理池非常适合进行 CPU-intensive 处理,但在不提供 y
数据帧的碎片内存表示的情况下跨进程共享数据就不是那么好。
我在这里重新安排了您的代码,以便我使用多线程池为每个周期创建 filtered_y
( 是 一个 CPU-intensive 操作,但是pandas 确实为某些操作释放了全局解释器锁——希望是这个)。然后我们只将 one-months 的数据传递给多处理池,而不是整个 y
数据帧,以使用工作函数 process_month
处理那个月。但是由于每个池进程都无法访问 y
数据帧,它只是 returns 需要用要替换的值更新的索引。
import pandas as pd
from multiprocessing.pool import Pool, ThreadPool
def process_month(period, filtered_y):
"""
returns a list of tuples consisting of (index, value) pairs
"""
filename = "data-" + str(period[0]) + "-" + str(period[1]).zfill(2) # data-2020-01
month_df = pd.read_csv(f'{filename}.csv', index_col=0, parse_dates=True) # Filesize: ~1 GB (data-2020-01.csv)
results = []
for index, row in filtered_y.iterrows():
idx = month_df.index[month_df.index.get_loc(index, method='nearest')]
for _, value in month_df.loc[idx:].itertuples():
up_delta = 200
down_delta = 200
up_value = value + up_delta
down_value = value - down_delta
if value > up_value:
results.append((index, 1))
break
if value < down_value:
results.append((index, 0))
break
return results
def process(period):
filtered_y = y[(y.index.month == period[1]) & (y.index.year == period[0])] # Only get the current month records
for index, value in multiprocessing_pool.apply(process_month, (period, filtered_y)):
y.loc[index, "result"] = value
def main():
global y, multiprocessing_pool
periods = [(2020, 1), (2020, 2), (2020, 3), (2020, 4), (2020, 5)]
y = pd.read_csv("y.csv", index_col=0, parse_dates=True).fillna(0) # Filesize: ~10 MB
with Pool() as multiprocessing_pool, ThreadPool(len(periods)) as thread_pool:
thread_pool.map(process, periods)
# Presumably y gets written out again as a CSV file here?
# Required for Windows:
if __name__ == '__main__':
main()