将分类列转换为附加列
Convert column of categoricals to additional columns
我有一个大型数据集,其形式为我之前从 avro 文件加载的以下数据框
timestamp
id
category
value
2021-01-01 00:00:00+00:00
a
d
g
2021-01-01 00:10:00+00:00
a
d
h
2021-01-01 00:10:00+00:00
a
e
h
2021-01-01 00:00:00+00:00
b
e
h
我想旋转 category
列(其中包含大约 50 个不同类别)并沿着 timestamp
和 id
列进行重复数据删除,这样结果看起来像这样
id
timestamp
d
e
a
2021-01-01 00:00:00+00:00
g
nan
a
2021-01-01 00:10:00+00:00
h
h
b
2021-01-01 00:00:00+00:00
nan
h
我知道如何在 pandas
中使用多索引和 stack
/unstack
操作来实现这一点,但是我的数据集太大而无法使用 pandas
没有手动批处理并且 dask
不支持多索引。有什么方法可以用 dask
?
有效地完成吗?
编辑:
正如@Dahn 所指出的,我创建了一个最小的合成示例 pandas:
import pandas as pd
records = [
{'idx': 0, 'id': 'a', 'category': 'd', 'value': 1},
{'idx': 1, 'id': 'a', 'category': 'e', 'value': 2},
{'idx': 2, 'id': 'a', 'category': 'f', 'value': 3},
{'idx': 0, 'id': 'b', 'category': 'd', 'value': 4},
{'idx': 1, 'id': 'c', 'category': 'e', 'value': 5},
{'idx': 2, 'id': 'c', 'category': 'f', 'value': 6}
]
frame = pd.DataFrame(records)
idx id category value
0 0 a d 1
1 1 a e 2
2 2 a f 3
3 0 b d 4
4 1 c e 5
5 2 c f 6
frame = frame.set_index(['id', 'idx', 'category'], drop=True).unstack().droplevel(0, axis=1).reset_index()
frame.columns.name = ''
id idx d e f
0 a 0 1.0 NaN NaN
1 a 1 NaN 2.0 NaN
2 a 2 NaN NaN 3.0
3 b 0 4.0 NaN NaN
4 c 1 NaN 5.0 NaN
5 c 2 NaN NaN 6.0
我不认为 Dask 会在 2021 年 10 月实现这一点。这可能是因为不支持 unstack
需要的多索引。不过最近有some work on this。
但是,我认为这仍然可以使用 apply-concat-apply paradigm(和 apply_concat_apply
函数)。
下面的解决方案适用于您提供的示例,原则上,我认为它应该可以正常工作,但我不确定。请谨慎行事,如果可能,请检查结果是否与 Pandas 给您的一致。我还在 Dask 的 github 本身上将其作为 feature request 发布。
import dask.dataframe as dd
# Create Dask DataFrame out of your `frame`
# npartitions is more than 1 to demonstrate this works on a partitioned datataset
df = dd.from_pandas(frame, npartitions=3)
# Dask needs to know work out what the categories are
# Alternatively you can use df.categorize
# See https://docs.dask.org/en/latest/dataframe-categoricals.html
category = 'category'
df[category] = df[category].astype(category).cat.as_known()
# Dask needs to know what the resulting DataFrame looks like
new_columns = pd.CategoricalIndex(df[category].cat.categories, name=category)
meta = pd.DataFrame(columns=new_columns,
index=df._meta.set_index(['idx', 'id']).index)
# Implement using apply_concat_apply ("aca")
# More details: https://blog.dask.org/2019/10/08/df-groupby
def identity(x): return x
def my_unstack(x):
return x.set_index(['id', 'idx', 'category'], drop=True).unstack()
def combine(x):
return x.groupby(level=[0, 1]).sum()
result = dd.core.apply_concat_apply([df],
chunk=identity,
aggregate=my_unstack,
combine=combine,
meta=meta)
result.compute()
选项 B:map_partitions
如果您已经能够根据 idx
或 id
中的至少一项对数据进行排序,那么您也可以简单地使用 map_partitions
并将每个分区视为 Pandas数据框。
这应该会显着改善内存使用和整体性能。
# df has sorted index `idx` in this scenario
category = 'category'
existing_categories = df[category].astype(category).cat.as_known().cat.categories
categories = [('value', cat) for cat in existing_categories]
new_columns = pd.MultiIndex.from_tuples(categories, names=(None, category))
meta = pd.DataFrame(columns=new_columns,
index=df._meta.set_index(['idx', 'id']).index)
def unstack_add_columns(x):
x = x.set_index(['id', 'category'], append=True, drop=True).unstack()
# make sure that result contains all necessary columns
return x.reindex(columns=new_columns)
df.map_partitions(unstack_add_columns, meta=meta)
如果您不能保证 idx 会被排序,您可以尝试类似
df_sorted = df.set_index('idx')
# I recommend saving to disk in between set_index and the rest
df_sorted.to_parquet('data-sorted.parq')
但这本身可能会带来内存问题。
作为 Dahn 回答的补充,为了回到非多级索引框架,我执行了以下操作:
meta = pd.DataFrame(
columns=['level_0', 'idx', 'id'] + [x for x in existing_categories],
index=df._meta.reset_index().index
)
def reset_index(x):
x = x.droplevel(0, axis=1)
x.columns.name = None
return x.reset_index()
df = df.map_partitions(reset_index, meta=meta).drop('level_0', axis=1)
可能有更优雅的解决方案来实现这一点,但它适用于我。
我有一个大型数据集,其形式为我之前从 avro 文件加载的以下数据框
timestamp | id | category | value |
---|---|---|---|
2021-01-01 00:00:00+00:00 | a | d | g |
2021-01-01 00:10:00+00:00 | a | d | h |
2021-01-01 00:10:00+00:00 | a | e | h |
2021-01-01 00:00:00+00:00 | b | e | h |
我想旋转 category
列(其中包含大约 50 个不同类别)并沿着 timestamp
和 id
列进行重复数据删除,这样结果看起来像这样
id | timestamp | d | e |
---|---|---|---|
a | 2021-01-01 00:00:00+00:00 | g | nan |
a | 2021-01-01 00:10:00+00:00 | h | h |
b | 2021-01-01 00:00:00+00:00 | nan | h |
我知道如何在 pandas
中使用多索引和 stack
/unstack
操作来实现这一点,但是我的数据集太大而无法使用 pandas
没有手动批处理并且 dask
不支持多索引。有什么方法可以用 dask
?
编辑:
正如@Dahn 所指出的,我创建了一个最小的合成示例 pandas:
import pandas as pd
records = [
{'idx': 0, 'id': 'a', 'category': 'd', 'value': 1},
{'idx': 1, 'id': 'a', 'category': 'e', 'value': 2},
{'idx': 2, 'id': 'a', 'category': 'f', 'value': 3},
{'idx': 0, 'id': 'b', 'category': 'd', 'value': 4},
{'idx': 1, 'id': 'c', 'category': 'e', 'value': 5},
{'idx': 2, 'id': 'c', 'category': 'f', 'value': 6}
]
frame = pd.DataFrame(records)
idx id category value
0 0 a d 1
1 1 a e 2
2 2 a f 3
3 0 b d 4
4 1 c e 5
5 2 c f 6
frame = frame.set_index(['id', 'idx', 'category'], drop=True).unstack().droplevel(0, axis=1).reset_index()
frame.columns.name = ''
id idx d e f
0 a 0 1.0 NaN NaN
1 a 1 NaN 2.0 NaN
2 a 2 NaN NaN 3.0
3 b 0 4.0 NaN NaN
4 c 1 NaN 5.0 NaN
5 c 2 NaN NaN 6.0
我不认为 Dask 会在 2021 年 10 月实现这一点。这可能是因为不支持 unstack
需要的多索引。不过最近有some work on this。
但是,我认为这仍然可以使用 apply-concat-apply paradigm(和 apply_concat_apply
函数)。
下面的解决方案适用于您提供的示例,原则上,我认为它应该可以正常工作,但我不确定。请谨慎行事,如果可能,请检查结果是否与 Pandas 给您的一致。我还在 Dask 的 github 本身上将其作为 feature request 发布。
import dask.dataframe as dd
# Create Dask DataFrame out of your `frame`
# npartitions is more than 1 to demonstrate this works on a partitioned datataset
df = dd.from_pandas(frame, npartitions=3)
# Dask needs to know work out what the categories are
# Alternatively you can use df.categorize
# See https://docs.dask.org/en/latest/dataframe-categoricals.html
category = 'category'
df[category] = df[category].astype(category).cat.as_known()
# Dask needs to know what the resulting DataFrame looks like
new_columns = pd.CategoricalIndex(df[category].cat.categories, name=category)
meta = pd.DataFrame(columns=new_columns,
index=df._meta.set_index(['idx', 'id']).index)
# Implement using apply_concat_apply ("aca")
# More details: https://blog.dask.org/2019/10/08/df-groupby
def identity(x): return x
def my_unstack(x):
return x.set_index(['id', 'idx', 'category'], drop=True).unstack()
def combine(x):
return x.groupby(level=[0, 1]).sum()
result = dd.core.apply_concat_apply([df],
chunk=identity,
aggregate=my_unstack,
combine=combine,
meta=meta)
result.compute()
选项 B:map_partitions
如果您已经能够根据 idx
或 id
中的至少一项对数据进行排序,那么您也可以简单地使用 map_partitions
并将每个分区视为 Pandas数据框。
这应该会显着改善内存使用和整体性能。
# df has sorted index `idx` in this scenario
category = 'category'
existing_categories = df[category].astype(category).cat.as_known().cat.categories
categories = [('value', cat) for cat in existing_categories]
new_columns = pd.MultiIndex.from_tuples(categories, names=(None, category))
meta = pd.DataFrame(columns=new_columns,
index=df._meta.set_index(['idx', 'id']).index)
def unstack_add_columns(x):
x = x.set_index(['id', 'category'], append=True, drop=True).unstack()
# make sure that result contains all necessary columns
return x.reindex(columns=new_columns)
df.map_partitions(unstack_add_columns, meta=meta)
如果您不能保证 idx 会被排序,您可以尝试类似
df_sorted = df.set_index('idx')
# I recommend saving to disk in between set_index and the rest
df_sorted.to_parquet('data-sorted.parq')
但这本身可能会带来内存问题。
作为 Dahn 回答的补充,为了回到非多级索引框架,我执行了以下操作:
meta = pd.DataFrame(
columns=['level_0', 'idx', 'id'] + [x for x in existing_categories],
index=df._meta.reset_index().index
)
def reset_index(x):
x = x.droplevel(0, axis=1)
x.columns.name = None
return x.reset_index()
df = df.map_partitions(reset_index, meta=meta).drop('level_0', axis=1)
可能有更优雅的解决方案来实现这一点,但它适用于我。