你如何从 Dask 中删除值计数不符合特定阈值的行?

How do you drop rows from Dask where the value count doesn't meet a certain threshold?

我正在处理一个相当大的数据集。未压缩的 CSV 文件约为 20 GB。我正在尝试使用 Dask,但对它不是很熟悉。我通常使用Pandas。我试图删除列中特定值的实例数小于特定阈值的行。这是一个例子:

原始数据集:

|icao   | callsign | reg    | acftType |
|-------| -------- |------- | -------- |
|abcdef | ETH720   | ET-ASJ | B738     |
|abcdef | ETH720   | ET-ASJ | B738     |
|abcdef | ETH720   | ET-ASJ | B738     |
|123456 | IBE6827  | EC-LUK | A333     |
|123456 | IBE6827  | EC-LUK | A333     |
|789ghi | FRH571   | OO-ACE | B744     |
|789ghi | FRH571   | OO-ACE | B744     |
|789ghi | FRH571   | OO-ACE | B744     |
|789ghi | FRH571   | OO-ACE | B744     |

如果阈值为 3,则生成的数据帧将为:

|icao   | callsign | reg    | acftType |
|-------| -------- |------- | -------- |
|abcdef | ETH720   | ET-ASJ | B738     |
|abcdef | ETH720   | ET-ASJ | B738     |
|abcdef | ETH720   | ET-ASJ | B738     |
|789ghi | FRH571   | OO-ACE | B744     |
|789ghi | FRH571   | OO-ACE | B744     |
|789ghi | FRH571   | OO-ACE | B744     |
|789ghi | FRH571   | OO-ACE | B744     |

找到方法了,但是好像很绕。我觉得应该有一个更简单的方法。在 Pandas 中是这样的:

threshold = 3
inputFrame = inputFrame.groupby('icao').filter(lambda x: len(x) >= threshold)

但是,Dask中没有filter()。这是我开始工作的复杂代码:

threshold = 3
a = inputFrame.groupby('icao').count().reg
a = a.to_frame()
a = a.rename(columns={'reg':'count'})
inputFrame = inputFrame.merge(a, how='left', on='icao')
inputFrame = inputFrame[(inputFrame['count'] >= threshold )]

有更简单的方法吗?

这个问题的答案可能取决于 'easier' 的定义,但这里有两种替代方法:

策略 #1:构建 icao 与用于 groupby 和计数的虚拟列串联,加入初始 df,然后删除虚拟列。

threshold = 3
inputFrame = inputFrame.join(
    inputFrame.icao.to_frame().assign(DROPME=0).
        groupby('icao').count().query(f'DROPME >= {threshold}'),
    on='icao', how='inner').drop(columns='DROPME').compute()

策略#2:进行分组并计数,然后从初始 df 中删除所有剩余列并与初始 df 连接。

threshold = 3
inputFrame = inputFrame.join(
    inputFrame.
        groupby('icao').count().query(f'reg >= {threshold}').
            drop(columns=set(inputFrame.columns) - set(['icao'])),
    on='icao', how='inner').compute()

测试输入:

      icao    callsign       reg    acftType
1  abcdef    ETH720      ET-ASJ    B738
2  abcdef    ETH720      ET-ASJ    B738
3  abcdef    ETH720      ET-ASJ    B738
4  123456    IBE6827     EC-LUK    A333
5  123456    IBE6827     EC-LUK    A333
6  789ghi    FRH571      OO-ACE    B744
7  789ghi    FRH571      OO-ACE    B744
8  789ghi    FRH571      OO-ACE    B744
9  789ghi    FRH571      OO-ACE    B744

输出:

      icao    callsign       reg    acftType
1  abcdef    ETH720      ET-ASJ    B738
2  abcdef    ETH720      ET-ASJ    B738
3  abcdef    ETH720      ET-ASJ    B738
6  789ghi    FRH571      OO-ACE    B744
7  789ghi    FRH571      OO-ACE    B744
8  789ghi    FRH571      OO-ACE    B744
9  789ghi    FRH571      OO-ACE    B744