Python - 使用 pandas 多处理多个大文件

Python - multiprocessing multiple large size files using pandas

我有一个 y.csv 文件。文件大小为 10 MB,其中包含来自 Jan 2020 to May 2020.

的数据

我每个月都有一个单独的文件。例如data-2020-01.csv。它包含详细的数据。每个月文件的文件大小约为1 GB.

我按月份拆分 y.csv,然后通过加载相关的月份文件来处理数据。当我花费大量时间时,这个过程花费的时间太长。例如24个月。

我想更快地处理数据。我可以访问具有 32 vCPU128 GB 内存的 AWS m6i.8xlarge 实例。

我是多处理新手。那么有人可以指导我吗?

这是我当前的代码。

import pandas as pd

periods = [(2020, 1), (2020, 2), (2020, 3), (2020, 4), (2020, 5)]

y = pd.read_csv("y.csv", index_col=0, parse_dates=True).fillna(0)  # Filesize: ~10 MB


def process(_month_df, _index):
    idx = _month_df.index[_month_df.index.get_loc(_index, method='nearest')]
    for _, value in _month_df.loc[idx:].itertuples():

        up_delta = 200
        down_delta = 200

        up_value = value + up_delta
        down_value = value - down_delta

        if value > up_value:
            y.loc[_index, "result"] = 1
            return

        if value < down_value:
            y.loc[_index, "result"] = 0
            return


for x in periods:
    filename = "data-" + str(x[0]) + "-" + str(x[1]).zfill(2)  # data-2020-01
    filtered_y = y[(y.index.month == x[1]) & (y.index.year == x[0])]  # Only get the current month records
    month_df = pd.read_csv(f'{filename}.csv', index_col=0, parse_dates=True)  # Filesize: ~1 GB (data-2020-01.csv)

    for index, row in filtered_y.iterrows():
        process(month_df, index)

正如在多个 pandas/threading 问题中评论的那样,CSV 文件是 IO 绑定的,您可以从使用 ThreadPoolExecutor.

中获得一些好处

同时,如果您要执行聚合操作,请考虑在处理器的 内部 执行 read_csv 并改用 ProcessPoolExecutor .

如果您要在多进程之间传递大量数据,您还需要适当的内存共享方法。

但是我看到iterrowsitertuples的用法,总的来说这两个指令让我眼睛流血。您确定不能以矢量化模式处理数据吗?

这个特定的部分我不确定它应该做什么,并且有 M 行会使它非常慢。

def process(_month_df, _index):
    idx = _month_df.index[_month_df.index.get_loc(_index, method='nearest')]
    for _, value in _month_df.loc[idx:].itertuples():

        up_delta = 200
        down_delta = 200

        up_value = value + up_delta
        down_value = value - down_delta

        if value > up_value:
            y.loc[_index, "result"] = 1
            return

        if value < down_value:
            y.loc[_index, "result"] = 0
            return

下面是一个矢量化代码,看它是上升还是下降,在哪一行

df=pd.DataFrame({'vals': np.random.random(int(10))*1000+5000}).astype('int64')
print(df.vals.values)

up_value = 6000
down_value = 3000
valsup = df.vals.values + 200*np.arange(df.shape[0])+200
valsdown = df.vals.values - 200*np.arange(df.shape[0])-200

#! argmax returns 0 if all false
# idx_up = np.argmax(valsup > up_value)
# idx_dwn= np.argmax(valsdown < down_value)

idx_up = np.argwhere(valsup > up_value)
idx_dwn= np.argwhere(valsdown < down_value)
idx_up = idx_up[0][0] if len(idx_up) else -1
idx_dwn = idx_dwn[0][0] if len(idx_dwn) else -1


if idx_up < 0 and idx_dwn<0:
    print(f" Not up nor down")
if idx_up < idx_dwn or idx_dwn<0:
    print(f" Result is positive, in position {idx_up}")
else: 
    print(f" Result is negative, in position {idx_dwn}")

为了完整起见,对 1000 个元素进行基准测试 itertuples()argwhere 方法:

  • .itertuples():757µs
  • arange + argwhere: 60µs

多线程池非常适合在线程之间共享 y 数据帧(避免使用共享内存的需要),但在 运行 处理越多 CPU-intensive 方面就不那么好了在平行下。多处理池非常适合进行 CPU-intensive 处理,但在不提供 y 数据帧的碎片内存表示的情况下跨进程共享数据就不是那么好。

我在这里重新安排了您的代码,以便我使用多线程池为每个周期创建 filtered_y 一个 CPU-intensive 操作,但是pandas 确实为某些操作释放了全局解释器锁——希望是这个)。然后我们只将 one-months 的数据传递给多处理池,而不是整个 y 数据帧,以使用工作函数 process_month 处理那个月。但是由于每个池进程都无法访问 y 数据帧,它只是 returns 需要用要替换的值更新的索引。

import pandas as pd
from multiprocessing.pool import Pool, ThreadPool

def process_month(period, filtered_y):
    """
    returns a list of tuples consisting of (index, value) pairs
    """
    filename = "data-" + str(period[0]) + "-" + str(period[1]).zfill(2)  # data-2020-01
    month_df = pd.read_csv(f'{filename}.csv', index_col=0, parse_dates=True)  # Filesize: ~1 GB (data-2020-01.csv)
    results = []
    for index, row in filtered_y.iterrows():   
        idx = month_df.index[month_df.index.get_loc(index, method='nearest')]
        for _, value in month_df.loc[idx:].itertuples():
    
            up_delta = 200
            down_delta = 200
    
            up_value = value + up_delta
            down_value = value - down_delta
    
            if value > up_value:
                results.append((index, 1))
                break
    
            if value < down_value:
                results.append((index, 0))
                break
    return results

def process(period):
    filtered_y = y[(y.index.month == period[1]) & (y.index.year == period[0])]  # Only get the current month records
    for index, value in multiprocessing_pool.apply(process_month, (period, filtered_y)):
        y.loc[index, "result"] = value

def main():
    global y, multiprocessing_pool

    periods = [(2020, 1), (2020, 2), (2020, 3), (2020, 4), (2020, 5)]
    y = pd.read_csv("y.csv", index_col=0, parse_dates=True).fillna(0)  # Filesize: ~10 MB

    with Pool() as multiprocessing_pool, ThreadPool(len(periods)) as thread_pool:
        thread_pool.map(process, periods)
        
    # Presumably y gets written out again as a CSV file here?

# Required for Windows:
if __name__ == '__main__':
    main()