python如何提高读取多个csv文件的速度
How to improve the speed of reading multiple csv files in python
这是我第一次创建用于处理包含大量数据的文件的代码,所以我有点卡在这里。
我想做的是读取路径列表,列出所有需要读取的 csv 文件,从每个文件中检索 HEAD 和 TAIL 并将其放入列表中。
我总共有 621 个 csv 文件,每个文件由 5800 行和 251 列组成
这是数据样本
[LOGGING],RD81DL96_1,3,4,5,2,,,,
LOG01,,,,,,,,,
DATETIME,INDEX,SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0]
TIME,INDEX,FF-1(1A) ,FF-1(1B) ,FF-1(1C) ,FF-1(2A),FF-2(1A) ,FF-2(1B) ,FF-2(1C),FF-2(2A)
47:29.6,1,172,0,139,1258,0,0,400,0
47:34.6,2,172,0,139,1258,0,0,400,0
47:39.6,3,172,0,139,1258,0,0,400,0
47:44.6,4,172,0,139,1263,0,0,400,0
47:49.6,5,172,0,139,1263,0,0,450,0
47:54.6,6,172,0,139,1263,0,0,450,0
问题是,读取所有文件大约需要 13 秒(老实说还是有点慢)
但是当我添加一行附加代码时,这个过程花了很多时间才完成,大约 4 分钟。
下面是代码片段:
# CsvList: [File Path, Change Date, File size, File Name]
for x, file in enumerate(CsvList):
timeColumn = ['TIME']
df = dd.read_csv(file[0], sep =',', skiprows = 3, encoding= 'CP932', engine='python', usecols=timeColumn)
# The process became long when this code is added
startEndList.append(list(df.head(1)) + list(df.tail(1)))
为什么会这样?我正在使用 dask.dataframe
第一种方法仅使用 Python 作为起点:
import pandas as pd
import io
def read_first_and_last_lines(filename):
with open(filename, 'rb') as fp:
# skip first 4 rows (headers)
[next(fp) for _ in range(4)]
# first line
first_line = fp.readline()
# start at -2x length of first line from the end of file
fp.seek(-2 * len(first_line), 2)
# last line
last_line = fp.readlines()[-1]
return first_line + last_line
data = []
for filename in pathlib.Path('data').glob('*.csv'):
data.append(read_first_and_last_lines(filename))
buf = io.BytesIO()
buf.writelines(data)
buf.seek(0)
df = pd.read_csv(buf, header=None, encoding='CP932')
目前,您的代码并未真正利用 Dask 的并行化功能,因为:
df.head
和 df.tail
调用将触发“计算”(即,将您的 Dask DataFrame 转换为 pandas DataFrame——这是我们在惰性评估中试图最小化的与 Dask)和
- for-loop 是 运行 顺序,因为您正在创建 Dask DataFrames 并将它们转换为 pandas DataFrames,所有这些都在循环中。
因此,您当前的示例类似于仅在 for-loop 中使用 pandas,但增加了 Dask-to-pandas-conversion 开销。
由于您需要处理每个文件,我建议您查看 Dask Delayed,这在这里可能更优雅+有用。以下 (pseudo-code) 将对每个文件并行执行 pandas 操作:
import dask
import pandas as pd
for file in list_of_files:
df = dask.delayed(pd.read_csv)(file)
result.append(df.head(1) + df.tail(1))
dask.compute(*result)
当我使用 4 csv-files 时 dask.visualize(*result)
的输出确认并行性:
如果你真的想在这里使用Dask DataFrame,你可以尝试:
- 将所有文件读入单个 Dask DataFrame,
- 确保每个Dask“分区”对应一个文件,
- 使用 Dask Dataframe
apply
获取 head
和 tail
值并将它们附加到新列表
- 在新列表上调用计算
这是我第一次创建用于处理包含大量数据的文件的代码,所以我有点卡在这里。
我想做的是读取路径列表,列出所有需要读取的 csv 文件,从每个文件中检索 HEAD 和 TAIL 并将其放入列表中。
我总共有 621 个 csv 文件,每个文件由 5800 行和 251 列组成
[LOGGING],RD81DL96_1,3,4,5,2,,,,
LOG01,,,,,,,,,
DATETIME,INDEX,SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0]
TIME,INDEX,FF-1(1A) ,FF-1(1B) ,FF-1(1C) ,FF-1(2A),FF-2(1A) ,FF-2(1B) ,FF-2(1C),FF-2(2A)
47:29.6,1,172,0,139,1258,0,0,400,0
47:34.6,2,172,0,139,1258,0,0,400,0
47:39.6,3,172,0,139,1258,0,0,400,0
47:44.6,4,172,0,139,1263,0,0,400,0
47:49.6,5,172,0,139,1263,0,0,450,0
47:54.6,6,172,0,139,1263,0,0,450,0
问题是,读取所有文件大约需要 13 秒(老实说还是有点慢)
但是当我添加一行附加代码时,这个过程花了很多时间才完成,大约 4 分钟。
下面是代码片段:
# CsvList: [File Path, Change Date, File size, File Name]
for x, file in enumerate(CsvList):
timeColumn = ['TIME']
df = dd.read_csv(file[0], sep =',', skiprows = 3, encoding= 'CP932', engine='python', usecols=timeColumn)
# The process became long when this code is added
startEndList.append(list(df.head(1)) + list(df.tail(1)))
为什么会这样?我正在使用 dask.dataframe
第一种方法仅使用 Python 作为起点:
import pandas as pd
import io
def read_first_and_last_lines(filename):
with open(filename, 'rb') as fp:
# skip first 4 rows (headers)
[next(fp) for _ in range(4)]
# first line
first_line = fp.readline()
# start at -2x length of first line from the end of file
fp.seek(-2 * len(first_line), 2)
# last line
last_line = fp.readlines()[-1]
return first_line + last_line
data = []
for filename in pathlib.Path('data').glob('*.csv'):
data.append(read_first_and_last_lines(filename))
buf = io.BytesIO()
buf.writelines(data)
buf.seek(0)
df = pd.read_csv(buf, header=None, encoding='CP932')
目前,您的代码并未真正利用 Dask 的并行化功能,因为:
df.head
和df.tail
调用将触发“计算”(即,将您的 Dask DataFrame 转换为 pandas DataFrame——这是我们在惰性评估中试图最小化的与 Dask)和- for-loop 是 运行 顺序,因为您正在创建 Dask DataFrames 并将它们转换为 pandas DataFrames,所有这些都在循环中。
因此,您当前的示例类似于仅在 for-loop 中使用 pandas,但增加了 Dask-to-pandas-conversion 开销。
由于您需要处理每个文件,我建议您查看 Dask Delayed,这在这里可能更优雅+有用。以下 (pseudo-code) 将对每个文件并行执行 pandas 操作:
import dask
import pandas as pd
for file in list_of_files:
df = dask.delayed(pd.read_csv)(file)
result.append(df.head(1) + df.tail(1))
dask.compute(*result)
当我使用 4 csv-files 时 dask.visualize(*result)
的输出确认并行性:
如果你真的想在这里使用Dask DataFrame,你可以尝试:
- 将所有文件读入单个 Dask DataFrame,
- 确保每个Dask“分区”对应一个文件,
- 使用 Dask Dataframe
apply
获取head
和tail
值并将它们附加到新列表 - 在新列表上调用计算