将分类列转换为附加列

Convert column of categoricals to additional columns

我有一个大型数据集,其形式为我之前从 avro 文件加载的以下数据框

timestamp id category value
2021-01-01 00:00:00+00:00 a d g
2021-01-01 00:10:00+00:00 a d h
2021-01-01 00:10:00+00:00 a e h
2021-01-01 00:00:00+00:00 b e h

我想旋转 category 列(其中包含大约 50 个不同类别)并沿着 timestampid 列进行重复数据删除,这样结果看起来像这样

id timestamp d e
a 2021-01-01 00:00:00+00:00 g nan
a 2021-01-01 00:10:00+00:00 h h
b 2021-01-01 00:00:00+00:00 nan h

我知道如何在 pandas 中使用多索引和 stack/unstack 操作来实现这一点,但是我的数据集太大而无法使用 pandas没有手动批处理并且 dask 不支持多索引。有什么方法可以用 dask?

有效地完成吗?

编辑:

正如@Dahn 所指出的,我创建了一个最小的合成示例 pandas:


import pandas as pd

records = [
    {'idx': 0, 'id': 'a', 'category': 'd', 'value': 1},
    {'idx': 1, 'id': 'a', 'category': 'e', 'value': 2},
    {'idx': 2, 'id': 'a', 'category': 'f', 'value': 3},
    {'idx': 0, 'id': 'b', 'category': 'd', 'value': 4},
    {'idx': 1, 'id': 'c', 'category': 'e', 'value': 5},
    {'idx': 2, 'id': 'c', 'category': 'f', 'value': 6}
]

frame = pd.DataFrame(records)
   idx id category  value
0    0  a        d      1
1    1  a        e      2
2    2  a        f      3
3    0  b        d      4
4    1  c        e      5
5    2  c        f      6
frame = frame.set_index(['id', 'idx', 'category'], drop=True).unstack().droplevel(0, axis=1).reset_index()
frame.columns.name = ''
  id  idx    d    e    f
0  a    0  1.0  NaN  NaN
1  a    1  NaN  2.0  NaN
2  a    2  NaN  NaN  3.0
3  b    0  4.0  NaN  NaN
4  c    1  NaN  5.0  NaN
5  c    2  NaN  NaN  6.0


我不认为 Dask 会在 2021 年 10 月实现这一点。这可能是因为不支持 unstack 需要的多索引。不过最近有some work on this

但是,我认为这仍然可以使用 apply-concat-apply paradigm(和 apply_concat_apply 函数)。

下面的解决方案适用于您提供的示例,原则上,我认为它应该可以正常工作,但我不确定。请谨慎行事,如果可能,请检查结果是否与 Pandas 给您的一致。我还在 Dask 的 github 本身上将其作为 feature request 发布。

import dask.dataframe as dd

# Create Dask DataFrame out of your `frame`
# npartitions is more than 1 to demonstrate this works on a partitioned datataset
df = dd.from_pandas(frame, npartitions=3)

# Dask needs to know work out what the categories are
# Alternatively you can use df.categorize
# See https://docs.dask.org/en/latest/dataframe-categoricals.html
category = 'category'
df[category] = df[category].astype(category).cat.as_known()

# Dask needs to know what the resulting DataFrame looks like
new_columns = pd.CategoricalIndex(df[category].cat.categories, name=category)
meta = pd.DataFrame(columns=new_columns, 
                    index=df._meta.set_index(['idx', 'id']).index)

# Implement using apply_concat_apply ("aca")
# More details: https://blog.dask.org/2019/10/08/df-groupby
def identity(x): return x

def my_unstack(x):
    return x.set_index(['id', 'idx', 'category'], drop=True).unstack()
    
def combine(x):
    return x.groupby(level=[0, 1]).sum()

result = dd.core.apply_concat_apply([df], 
                   chunk=identity, 
                   aggregate=my_unstack, 
                   combine=combine,
                   meta=meta)

result.compute()

选项 B:map_partitions

如果您已经能够根据 idxid 中的至少一项对数据进行排序,那么您也可以简单地使用 map_partitions 并将每个分区视为 Pandas数据框。

这应该会显着改善内存使用和整体性能。

# df has sorted index `idx` in this scenario

category = 'category'
existing_categories = df[category].astype(category).cat.as_known().cat.categories
categories = [('value', cat) for cat in existing_categories]

new_columns = pd.MultiIndex.from_tuples(categories, names=(None, category))

meta = pd.DataFrame(columns=new_columns, 
                    index=df._meta.set_index(['idx', 'id']).index)

def unstack_add_columns(x):
    x = x.set_index(['id', 'category'], append=True, drop=True).unstack()
    # make sure that result contains all necessary columns
    return x.reindex(columns=new_columns) 

df.map_partitions(unstack_add_columns, meta=meta)

如果您不能保证 idx 会被排序,您可以尝试类似

df_sorted = df.set_index('idx')
# I recommend saving to disk in between set_index and the rest
df_sorted.to_parquet('data-sorted.parq')

但这本身可能会带来内存问题。

作为 Dahn 回答的补充,为了回到非多级索引框架,我执行了以下操作:

meta = pd.DataFrame(
        columns=['level_0', 'idx', 'id'] + [x for x in existing_categories],
        index=df._meta.reset_index().index
    )

def reset_index(x):
    x = x.droplevel(0, axis=1)
    x.columns.name = None
    return x.reset_index()

df = df.map_partitions(reset_index, meta=meta).drop('level_0', axis=1)

可能有更优雅的解决方案来实现这一点,但它适用于我。