在 Dask 中过滤分组 df

Filtering grouped df in Dask

与 Pandas 的类似问题相关:filtering grouped df in pandas

动作 根据应用于与 groupby 列不同的列的表达式来消除组。

问题 没有为分组数据帧实现过滤器。

尝试过 Groupby 和 apply 消除某些组,这 return 是一个索引错误,因为 apply 函数应该总是 return something?

In [16]:
def filter_empty(df):
    if not df.label.values.all(4):
        return df

df_nonempty = df_norm.groupby('hash').apply(filter_empty, meta=meta)

In [17]:
len(df_nonempty.hash.unique())
...

<ipython-input-16-6da6d9b6c069> in filter_empty()
      1 def filter_empty(df):
----> 2     if not df.label.values.all(4):
      3         return df
      4 
      5 df_nonempty = df_norm.groupby('hash').apply(filter_empty, meta=meta)

/opt/conda/lib/python3.5/site-packages/numpy/core/_methods.py in _all()
     39 
     40 def _all(a, axis=None, dtype=None, out=None, keepdims=False):
---> 41     return umr_all(a, axis, dtype, out, keepdims)
     42 
     43 def _count_reduce_items(arr, axis):

ValueError: 'axis' entry is out of bounds

问题 还有另一种方法可以实现 Pandas grouped.filter(lambda x: len(x) > 1) 的 Dask 等价物吗?或者 groupby 应用只是错误地实施?

例子

import numpy as np
import pandas as pd
import dask.dataframe as dd

In [3]:
df = pd.DataFrame({'A':list('aacaaa'),
                   'B':[4,5,4,5,5,4],
                   'C':[7,8,9,4,2,3],
                   'D':[1,3,5,7,1,0],
                   'E':[5,3,6,9,2,4],
                   'F':list('aaabbc')})
df = dd.from_pandas(df, npartitions=1)

In [8]:
df.A.unique().compute()
Out[8]:
0    a
1    c
Name: A, dtype: object

In [6]:
def filter_4(df):
    if not df.B.values.all(4):
        return df

df_notalla = df.groupby('A').apply(filter_4, meta=df)

In [10]:
df_notall4.A.unique().compute()
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-10-894a491faa57> in <module>()
----> 1 df_notalla.A.unique().compute()

...

<ipython-input-6-ef10326ae42a> in filter_4(df)
      1 def filter_4(df):
----> 2     if not df.B.values.all(4):
      3         return df
      4 
      5 df_notalla = df.groupby('A').apply(filter_4, meta=df)

/opt/conda/lib/python3.5/site-packages/numpy/core/_methods.py in _all(a, axis, dtype, out, keepdims)
     39 
     40 def _all(a, axis=None, dtype=None, out=None, keepdims=False):
---> 41     return umr_all(a, axis, dtype, out, keepdims)
     42 
     43 def _count_reduce_items(arr, axis):

ValueError: 'axis' entry is out of bounds

我觉得你可以groupby + size first, then map for Series (it is like transform, but not implemented in dask too) and last filter by boolean indexing:

df = pd.DataFrame({'A':list('aacaaa'),
                   'B':[4,5,4,5,5,4],
                   'C':[7,8,9,4,2,3],
                   'D':[1,3,5,7,1,0],
                   'E':[5,3,6,9,2,4],
                   'F':list('aaabbc')})

print (df)
   A  B  C  D  E  F
0  a  4  7  1  5  a
1  a  5  8  3  3  a
2  c  4  9  5  6  a
3  a  5  4  7  9  b
4  a  5  2  1  2  b
5  a  4  3  0  4  c

a = df.groupby('F')['A'].size()
print (a)
F
a    3
b    2
c    1
Name: A, dtype: int64

s = df['F'].map(a)
print (s)
0    3
1    3
2    3
3    2
4    2
5    1
Name: F, dtype: int64

df = df[s > 1]
print (df)
   A  B  C  D  E  F
0  a  4  7  1  5  a
1  a  5  8  3  3  a
2  c  4  9  5  6  a
3  a  5  4  7  9  b
4  a  5  2  1  2  b

编辑:

我觉得这里没有必要groupby:

df_notall4 = df[df.C != 4].drop_duplicates(subset=['A','D'])['D'].compute()

但如果确实需要的话:

def filter_4(x):
        return x[x.C != 4]

df_notall4 = df.groupby('A').apply(filter_4, meta=df).D.unique().compute()
print (df_notall4)
0    1
1    3
2    0
3    5
Name: D, dtype: int64

感谢@jezrael,我审查了我的实施并创建了以下解决方案(参见我提供的示例)。

df_notall4 = []
for d in list(df[df.C != 4].D.unique().compute()):
    df_notall4.append(df.groupby('D').get_group(d))

df_notall4 = dd.concat(df_notall4, interleave_partitions=True)

结果是

In [8]:
df_notall4.D.unique().compute()
Out[8]:
0    1
1    3
2    5
3    0
Name: D, dtype: object