如何根据时间戳列和名称列对 dask 中的行进行分组,同时保持一定的分布?

How can I group rows in dask based on a timestamp column and a name column whilst maintaining a certain distribution?

我正在尝试从这个较大的 daskframe 制作一个较小的 daskframe,条件是如果在过去 60 天内嫌疑人被看到 5 次或更多次,我只想保留这些记录并丢弃所有旧的那些。但是,如果嫌疑人还没有被见过 5 次,我想再回到过去 60 天并再次检查,直到我达到至少 5 次的标准,如果某个嫌疑人实际上不超过 5 次在整个数据集中,我将只取其中的任何内容。

这是我试过的:

    time_sliced_df = []

    df = df.sort_values('suspect')
    print(df.groupby('suspect').aggregate())

    for counter, row in df.iterrows():
        print('{}/{}'.format(counter, len(df)))
        for days_back in range(0, 1095, 60):
            time_sliced_df.append(row) if row.date_sighted > (datetime.now() - timedelta(days=days_back)) else None
            if len(time_sliced_df) > 5:
                list_of_recent_suspects.append(time_sliced_df)
                list_of_recent_suspects= list(np.array(list_of_recent_suspects, dtype='object').flatten())
                break

    return list_of_recent_suspects

过去在使用 pandas 时,我有一些过度设计的多处理方法仍然需要很长时间,我真的希望利用 dask 能帮助我克服这个问题,它已经帮了我很多很舒服。

我不只是要求某人提供代码,甚至只是被告知我应该使用 groupby 或尝试某种 dask/pandas 查询等

       suspect date_sighted  
48         ABC 2016-05-12
54       PPPPP 2017-01-07
55         ABC 2017-01-18
57         CFG 2017-01-28
63       PPPPP 2017-03-03
...        ...        ...
3378       IOI 2019-08-14
3418      ZZZZ 2019-08-14
3472       IOI 2019-08-16
3607      ZZZZ 2019-08-19
3669       CFG 2019-08-20

谢谢!

def segregate_by_last_60_days(df: dd.DataFrame) -> dd.DataFrame:
    for days_back in range(0, 1095, 60):
        placeholder_df: dd.DataFrame = df[df.date_sighted > (datetime.now() - timedelta(days=days_back))]

    if len(placeholder_df) > 5:
        return placeholder_df

return placeholder_df

def collate_dataset(df: dd.DataFrame) -> dd.DataFrame:
    return df.groupby('suspect').apply(segregate_by_last_60_days, meta=df.head(0))

这就是我最终的解决方案,比我预期的更高效。