Dask Dataframe:按 A 列删除重复项,保留 B 列中具有最高值的行
Dask Dataframe: Remove duplicates by columns A, keeping the row with the highest value in column B
基本上这是 python pandas: Remove duplicates by columns A, keeping the row with the highest value in column B 中 pandas 的答案。在pandas我采用了解决方案
df.sort_values('B', ascending=False).drop_duplicates('A').sort_index()
但我无法有效地将相同的解决方案应用于 dask,因为 dask 不喜欢 sort_values
。我可以通过
获得最大索引
max_idx = df.groupby("A")["B"].idxmax().values
但我必须先计算最大索引,然后才能将它们用作 df.loc
的参数,即
df.loc[max_idx.compute()]
在整个 dask 框架上,方法 df.nlargest(1, "B")
满足了我的需要,但我还没有想出如何使用 groupby 来满足我的需要。
在我基于 dask 框架的分析中,我的工作流程目前使用 dask 进行内存不足操作,以对数据集进行不同的操作和选择,直到它达到可管理的大小,然后继续 pandas,所以我的临时解决方案是将重复删除移动到我分析的 pandas 部分,但我很好奇是否有一种高效优雅的方法可以在 dask 中完成它。
Remove duplicates by columns A, keeping the row with the highest value in column B
在这种情况下,df.sort_values('B', ascending=False).drop_duplicates('A').sort_index()
的 pandas 解决方案需要全局排序,我们在 set_index
之外的 CPU 上的 Dask 中没有(尽管我们在 GPU 上有) ).
一般来说,解决此类问题的有效方法是尽量减少对全局信息的需求。
在这种情况下,您可以根据散列 bashed 洗牌 + 分区内 map/reduce 来重构您的算法,因为给定的行只需要知道与同一键关联的其他行。
import pandas as pd
import dask.dataframe as dd
import numpy as np
np.random.seed(12)
df = pd.DataFrame({
"a": [0,1,2,3,4]*20,
"b": np.random.normal(10, 5, 100)
})
ddf = dd.from_pandas(df, npartitions=10)
print(df.sort_values('b', ascending=False).drop_duplicates('a').sort_index())
a b
9 4 24.359097
16 1 15.062577
47 2 21.209089
53 3 20.571721
75 0 18.182315
使用 Dask,我们可以进行基于散列的随机播放,这将保证给定键的所有行都在同一分区中。然后,我们可以 运行 我们 pandas 在每个分区上独立减少。
print(ddf.shuffle(on="a").map_partitions(
lambda x: x.sort_values("b", ascending=False).drop_duplicates('a')
).compute())
a b
16 1 15.062577
47 2 21.209089
9 4 24.359097
75 0 18.182315
53 3 20.571721
如果您需要对您的最终输出进行全局排序,那么事情就会变得复杂。通常,这不是必需的。
基本上这是 python pandas: Remove duplicates by columns A, keeping the row with the highest value in column B 中 pandas 的答案。在pandas我采用了解决方案
df.sort_values('B', ascending=False).drop_duplicates('A').sort_index()
但我无法有效地将相同的解决方案应用于 dask,因为 dask 不喜欢 sort_values
。我可以通过
max_idx = df.groupby("A")["B"].idxmax().values
但我必须先计算最大索引,然后才能将它们用作 df.loc
的参数,即
df.loc[max_idx.compute()]
在整个 dask 框架上,方法 df.nlargest(1, "B")
满足了我的需要,但我还没有想出如何使用 groupby 来满足我的需要。
在我基于 dask 框架的分析中,我的工作流程目前使用 dask 进行内存不足操作,以对数据集进行不同的操作和选择,直到它达到可管理的大小,然后继续 pandas,所以我的临时解决方案是将重复删除移动到我分析的 pandas 部分,但我很好奇是否有一种高效优雅的方法可以在 dask 中完成它。
Remove duplicates by columns A, keeping the row with the highest value in column B
在这种情况下,df.sort_values('B', ascending=False).drop_duplicates('A').sort_index()
的 pandas 解决方案需要全局排序,我们在 set_index
之外的 CPU 上的 Dask 中没有(尽管我们在 GPU 上有) ).
一般来说,解决此类问题的有效方法是尽量减少对全局信息的需求。
在这种情况下,您可以根据散列 bashed 洗牌 + 分区内 map/reduce 来重构您的算法,因为给定的行只需要知道与同一键关联的其他行。
import pandas as pd
import dask.dataframe as dd
import numpy as np
np.random.seed(12)
df = pd.DataFrame({
"a": [0,1,2,3,4]*20,
"b": np.random.normal(10, 5, 100)
})
ddf = dd.from_pandas(df, npartitions=10)
print(df.sort_values('b', ascending=False).drop_duplicates('a').sort_index())
a b
9 4 24.359097
16 1 15.062577
47 2 21.209089
53 3 20.571721
75 0 18.182315
使用 Dask,我们可以进行基于散列的随机播放,这将保证给定键的所有行都在同一分区中。然后,我们可以 运行 我们 pandas 在每个分区上独立减少。
print(ddf.shuffle(on="a").map_partitions(
lambda x: x.sort_values("b", ascending=False).drop_duplicates('a')
).compute())
a b
16 1 15.062577
47 2 21.209089
9 4 24.359097
75 0 18.182315
53 3 20.571721
如果您需要对您的最终输出进行全局排序,那么事情就会变得复杂。通常,这不是必需的。