python如何提高读取多个csv文件的速度

How to improve the speed of reading multiple csv files in python

这是我第一次创建用于处理包含大量数据的文件的代码,所以我有点卡在这里。

我想做的是读取路径列表,列出所有需要读取的 csv 文件,从每个文件中检索 HEAD 和 TAIL 并将其放入列表中。

我总共有 621 个 csv 文件,每个文件由 5800 行和 251 列组成 这是数据样本

[LOGGING],RD81DL96_1,3,4,5,2,,,,
LOG01,,,,,,,,,
DATETIME,INDEX,SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0]
TIME,INDEX,FF-1(1A) ,FF-1(1B) ,FF-1(1C) ,FF-1(2A),FF-2(1A) ,FF-2(1B) ,FF-2(1C),FF-2(2A)
47:29.6,1,172,0,139,1258,0,0,400,0
47:34.6,2,172,0,139,1258,0,0,400,0
47:39.6,3,172,0,139,1258,0,0,400,0
47:44.6,4,172,0,139,1263,0,0,400,0
47:49.6,5,172,0,139,1263,0,0,450,0
47:54.6,6,172,0,139,1263,0,0,450,0

问题是,读取所有文件大约需要 13 秒(老实说还是有点慢)

但是当我添加一行附加代码时,这个过程花了很多时间才完成,大约 4 分钟。

下面是代码片段:

# CsvList: [File Path, Change Date, File size, File Name]
for x, file in enumerate(CsvList):
     timeColumn = ['TIME']
     df = dd.read_csv(file[0], sep =',', skiprows = 3, encoding= 'CP932', engine='python', usecols=timeColumn)

     # The process became long when this code is added
     startEndList.append(list(df.head(1)) + list(df.tail(1))) 

为什么会这样?我正在使用 dask.dataframe

第一种方法仅使用 Python 作为起点:

import pandas as pd
import io

def read_first_and_last_lines(filename):
    with open(filename, 'rb') as fp:
        # skip first 4 rows (headers)
        [next(fp) for _ in range(4)]
        # first line
        first_line = fp.readline()
        # start at -2x length of first line from the end of file
        fp.seek(-2 * len(first_line), 2)
        # last line
        last_line = fp.readlines()[-1]
        return first_line + last_line
        
data = []
for filename in pathlib.Path('data').glob('*.csv'):
    data.append(read_first_and_last_lines(filename))

buf = io.BytesIO()
buf.writelines(data)
buf.seek(0)
df = pd.read_csv(buf, header=None, encoding='CP932')

目前,您的代码并未真正利用 Dask 的并行化功能,因为:

  • df.headdf.tail 调用将触发“计算”(即,将您的 Dask DataFrame 转换为 pandas DataFrame——这是我们在惰性评估中试图最小化的与 Dask)和
  • for-loop 是 运行 顺序,因为您正在创建 Dask DataFrames 并将它们转换为 pandas DataFrames,所有这些都在循环中。

因此,您当前的示例类似于仅在 for-loop 中使用 pandas,但增加了 Dask-to-pandas-conversion 开销。

由于您需要处理每个文件,我建议您查看 Dask Delayed,这在这里可能更优雅+有用。以下 (pseudo-code) 将对每个文件并行执行 pandas 操作:

import dask
import pandas as pd

for file in list_of_files:
    df = dask.delayed(pd.read_csv)(file)
    result.append(df.head(1) + df.tail(1))

dask.compute(*result)

当我使用 4 csv-files 时 dask.visualize(*result) 的输出确认并行性:

如果你真的想在这里使用Dask DataFrame,你可以尝试:

  • 将所有文件读入单个 Dask DataFrame,
  • 确保每个Dask“分区”对应一个文件,
  • 使用 Dask Dataframe apply 获取 headtail 值并将它们附加到新列表
  • 在新列表上调用计算