Dask dataframe - 根据分隔符将列拆分为多行
Dask dataframe - split column into multiple rows based on delimiter
使用 dask dataframe 将一列拆分为多行的有效方法是什么?例如,假设我有一个 csv 文件,我使用 dask 读取该文件以生成以下 dask 数据帧:
id var1 var2
1 A Z,Y
2 B X
3 C W,U,V
我想将其转换为:
id var1 var2
1 A Z
1 A Y
2 B X
3 C W
3 C U
3 C V
我已经查看了 Split (explode) pandas dataframe string entry to separate rows and pandas: How do I split text in a column into multiple rows? 的答案。
我尝试应用 中给出的答案,但 dask 似乎不接受 str.split 中的扩展关键字。
我还尝试应用 but then found out that np.repeat isn't implemented in dask with integer arrays (https://github.com/dask/dask/issues/2946) 中建议的矢量化方法。
我在 pandas 中尝试了一些其他方法,但它们真的很慢 - 使用 dask 可能会更快,但我想先检查是否有人使用任何特定方法取得成功。我正在处理一个包含超过 1000 万行和 10 列(字符串数据)的数据集。拆分成行后,它可能会变成约 5000 万行。
感谢您关注此事!我很感激。
Dask 允许您直接使用 pandas 进行逐行操作(像这样)或者可以一次应用于一个分区。请记住,一个 Dask 数据帧由一组 Pandas 个数据帧组成。
对于 Pandas 的情况,您可以根据链接的问题执行此操作:
df = pd.DataFrame([["A", "Z,Y"], ["B", "X"], ["C", "W,U,V"]],
columns=['var1', 'var2'])
df.drop('var2', axis=1).join(
df.var2.str.split(',', expand=True).stack().reset_index(drop=True, level=1).rename('var2'))
因此对于 Dask,您可以通过 map_partitions
应用完全相同的方法,因为每一行都独立于所有其他行。如果传递的函数是单独写出来的,而不是作为 lambda,这可能看起来更清晰:
d = dd.from_pandas(df, 2)
d.map_partitions(
lambda df: df.drop('var2', axis=1).join(
df.var2.str.split(',', expand=True).stack().reset_index(drop=True, level=1).rename('var2')))
如果您对此进行 .compute()
,您将得到与上述 Pandas 案例完全相同的结果。您可能 不想 像那样一次性计算您的海量数据帧,而是对其进行进一步处理。
使用这个:
>>> df.join(pd.DataFrame(df.var2.str.split(',', expand=True).stack()
.reset_index(level=1, drop=True),columns=['var2 '])).drop('var2',1)
.rename(columns=str.strip)
id var1 var2
0 1 A Z
0 1 A Y
1 2 B X
2 3 C W
2 3 C U
2 3 C V
>>>
或者如果需要重新设置索引:
>>> df.join(pd.DataFrame(df.var2.str.split(',', expand=True).stack()
.reset_index(level=1, drop=True),columns=['var2 '])).drop('var2',1)
.rename(columns=str.strip).reset_index(drop=True)
id var1 var2
0 1 A Z
1 1 A Y
2 2 B X
3 3 C W
4 3 C U
5 3 C V
>>>
到一个简单的数据框:
from dask import dataframe as dd
sd = dd.from_pandas(df, npartitions=6)
时间(字面上相同):
>>> timeit.timeit(lambda: df.join(pd.DataFrame(df.var2.str.split(',', expand=True).stack()
.reset_index(level=1, drop=True),columns=['var2 '])).drop('var2',1)
.rename(columns=str.strip),number=10) # U9-Forward
0.05815268672555618
>>> timeit.timeit(lambda: df.drop('var2', axis=1).join(
df.var2.str.split(',', expand=True).stack().reset_index(drop=True, level=1).rename('var2')),number=10) # mdurant
0.05137591577754108
>>>
使用 dask dataframe 将一列拆分为多行的有效方法是什么?例如,假设我有一个 csv 文件,我使用 dask 读取该文件以生成以下 dask 数据帧:
id var1 var2
1 A Z,Y
2 B X
3 C W,U,V
我想将其转换为:
id var1 var2
1 A Z
1 A Y
2 B X
3 C W
3 C U
3 C V
我已经查看了 Split (explode) pandas dataframe string entry to separate rows and pandas: How do I split text in a column into multiple rows? 的答案。
我尝试应用 中给出的答案,但 dask 似乎不接受 str.split 中的扩展关键字。
我还尝试应用 but then found out that np.repeat isn't implemented in dask with integer arrays (https://github.com/dask/dask/issues/2946) 中建议的矢量化方法。
我在 pandas 中尝试了一些其他方法,但它们真的很慢 - 使用 dask 可能会更快,但我想先检查是否有人使用任何特定方法取得成功。我正在处理一个包含超过 1000 万行和 10 列(字符串数据)的数据集。拆分成行后,它可能会变成约 5000 万行。
感谢您关注此事!我很感激。
Dask 允许您直接使用 pandas 进行逐行操作(像这样)或者可以一次应用于一个分区。请记住,一个 Dask 数据帧由一组 Pandas 个数据帧组成。
对于 Pandas 的情况,您可以根据链接的问题执行此操作:
df = pd.DataFrame([["A", "Z,Y"], ["B", "X"], ["C", "W,U,V"]],
columns=['var1', 'var2'])
df.drop('var2', axis=1).join(
df.var2.str.split(',', expand=True).stack().reset_index(drop=True, level=1).rename('var2'))
因此对于 Dask,您可以通过 map_partitions
应用完全相同的方法,因为每一行都独立于所有其他行。如果传递的函数是单独写出来的,而不是作为 lambda,这可能看起来更清晰:
d = dd.from_pandas(df, 2)
d.map_partitions(
lambda df: df.drop('var2', axis=1).join(
df.var2.str.split(',', expand=True).stack().reset_index(drop=True, level=1).rename('var2')))
如果您对此进行 .compute()
,您将得到与上述 Pandas 案例完全相同的结果。您可能 不想 像那样一次性计算您的海量数据帧,而是对其进行进一步处理。
使用这个:
>>> df.join(pd.DataFrame(df.var2.str.split(',', expand=True).stack()
.reset_index(level=1, drop=True),columns=['var2 '])).drop('var2',1)
.rename(columns=str.strip)
id var1 var2
0 1 A Z
0 1 A Y
1 2 B X
2 3 C W
2 3 C U
2 3 C V
>>>
或者如果需要重新设置索引:
>>> df.join(pd.DataFrame(df.var2.str.split(',', expand=True).stack()
.reset_index(level=1, drop=True),columns=['var2 '])).drop('var2',1)
.rename(columns=str.strip).reset_index(drop=True)
id var1 var2
0 1 A Z
1 1 A Y
2 2 B X
3 3 C W
4 3 C U
5 3 C V
>>>
到一个简单的数据框:
from dask import dataframe as dd
sd = dd.from_pandas(df, npartitions=6)
时间(字面上相同):
>>> timeit.timeit(lambda: df.join(pd.DataFrame(df.var2.str.split(',', expand=True).stack()
.reset_index(level=1, drop=True),columns=['var2 '])).drop('var2',1)
.rename(columns=str.strip),number=10) # U9-Forward
0.05815268672555618
>>> timeit.timeit(lambda: df.drop('var2', axis=1).join(
df.var2.str.split(',', expand=True).stack().reset_index(drop=True, level=1).rename('var2')),number=10) # mdurant
0.05137591577754108
>>>