Dask dataframe - 根据分隔符将列拆分为多行

Dask dataframe - split column into multiple rows based on delimiter

使用 dask dataframe 将一列拆分为多行的有效方法是什么?例如,假设我有一个 csv 文件,我使用 dask 读取该文件以生成以下 dask 数据帧:

id var1 var2
1  A    Z,Y
2  B    X
3  C    W,U,V

我想将其转换为:

id var1 var2
1  A    Z
1  A    Y
2  B    X
3  C    W
3  C    U
3  C    V

我已经查看了 Split (explode) pandas dataframe string entry to separate rows and pandas: How do I split text in a column into multiple rows? 的答案。

我尝试应用 中给出的答案,但 dask 似乎不接受 str.split 中的扩展关键字。

我还尝试应用 but then found out that np.repeat isn't implemented in dask with integer arrays (https://github.com/dask/dask/issues/2946) 中建议的矢量化方法。

我在 pandas 中尝试了一些其他方法,但它们真的很慢 - 使用 dask 可能会更快,但我想先检查是否有人使用任何特定方法取得成功。我正在处理一个包含超过 1000 万行和 10 列(字符串数据)的数据集。拆分成行后,它可能会变成约 5000 万行。

感谢您关注此事!我很感激。

Dask 允许您直接使用 pandas 进行逐行操作(像这样)或者可以一次应用于一个分区。请记住,一个 Dask 数据帧由一组 Pandas 个数据帧组成。

对于 Pandas 的情况,您可以根据链接的问题执行此操作:

df = pd.DataFrame([["A", "Z,Y"], ["B", "X"], ["C", "W,U,V"]], 
    columns=['var1', 'var2'])
df.drop('var2', axis=1).join(
    df.var2.str.split(',', expand=True).stack().reset_index(drop=True, level=1).rename('var2'))

因此对于 Dask,您可以通过 map_partitions 应用完全相同的方法,因为每一行都独立于所有其他行。如果传递的函数是单独写出来的,而不是作为 lambda,这可能看起来更清晰:

d = dd.from_pandas(df, 2)
d.map_partitions(
    lambda df: df.drop('var2', axis=1).join(
        df.var2.str.split(',', expand=True).stack().reset_index(drop=True, level=1).rename('var2')))

如果您对此进行 .compute(),您将得到与上述 Pandas 案例完全相同的结果。您可能 不想 像那样一次性计算您的海量数据帧,而是对其进行进一步处理。

使用这个:

>>> df.join(pd.DataFrame(df.var2.str.split(',', expand=True).stack()
            .reset_index(level=1, drop=True),columns=['var2 '])).drop('var2',1)
            .rename(columns=str.strip)
   id var1 var2
0   1    A    Z
0   1    A    Y
1   2    B    X
2   3    C    W
2   3    C    U
2   3    C    V
>>> 

或者如果需要重新设置索引:

>>> df.join(pd.DataFrame(df.var2.str.split(',', expand=True).stack()                     
            .reset_index(level=1, drop=True),columns=['var2 '])).drop('var2',1)
            .rename(columns=str.strip).reset_index(drop=True)
   id var1 var2
0   1    A    Z
1   1    A    Y
2   2    B    X
3   3    C    W
4   3    C    U
5   3    C    V
>>> 

到一个简单的数据框:

from dask import dataframe as dd 
sd = dd.from_pandas(df, npartitions=6)

时间(字面上相同):

>>> timeit.timeit(lambda: df.join(pd.DataFrame(df.var2.str.split(',', expand=True).stack()
            .reset_index(level=1, drop=True),columns=['var2 '])).drop('var2',1)
            .rename(columns=str.strip),number=10) # U9-Forward
0.05815268672555618
>>> timeit.timeit(lambda: df.drop('var2', axis=1).join(
    df.var2.str.split(',', expand=True).stack().reset_index(drop=True, level=1).rename('var2')),number=10) # mdurant
0.05137591577754108
>>>