如何使用 Dask read_csv 读取每第 n 行以在多个文件中进行快速多次读取?

How to read every nth row using Dask read_csv for fast multiple reading in multiple files?

我正在尝试将多个 CSV 文件读入单个数据帧。虽然这可以使用列表理解和 Panda 的 concat 函数,例如

import pandas as pd
files = ['file1.csv', 'file2.csv', etc....]
all_df = []
for filename in files:
    all_df.append(pd.read_csv(filename))
df = pd.concat(all_df)

当文件是一个长列表(例如 100 项)时,我发现这太慢了。

我试过使用 Dask,它接受列表作为输入并具有内置的并行化速度,例如

import dask.dataframe as dd
df_dask = dd.read_csv(files)
df = df_dask.compute()

这使速度提高了约 2 倍。

但是,为了进一步加快速度,我希望能够只读入文件的每 N 行。

使用 Pandas,我可以使用 lambda 函数和 read_csv 的 skiprows 参数来完成此操作。例如 cond = lambda x : x % downsampling != 0 并在循环中使用 pd.read_csv(filename, skiprows=cond).

但是,这对 Dask 不起作用,并且 skiprows 参数不接受 lambda 函数。我无法将整数传递给 skiprows,因为每个文件都有不同的长度,因此每个文件要跳过的行都不同。

有没有快速的解决办法?我认为某种与 Dask 兼容的下采样操作可能是一种解决方案,但不确定如何实施。

请问可以吗?

详细说明@quizzical_panini 使用 dask.delayed 的建议:

import dask
import pandas as pd

@dask.delayed
def custom_pandas_load(file_path):
     # do what you would do if you had one file
    cond = lambda x : x % downsampling != 0
    df = pd.read_csv(file_path, skiprows=cond)
    return df

[computed_dfs] = dask.compute(
    [custom_pandas_load(file_path)
     for file_path in files]
)

df_final = pd.concat(computed_dfs)