Dask 数据框:`set_index` 可以将单个索引放入多个分区吗?

Dask dataframe: Can `set_index` put a single index into multiple partitions?

根据经验,每当你在 Dask 数据帧上 set_index 时,Dask 总是会将具有相同索引的行放入单个分区,即使这会导致分区严重不平衡。

这里有一个演示:

import pandas as pd
import dask.dataframe as dd

users = [1]*1000 + [2]*1000 + [3]*1000

df = pd.DataFrame({'user': users})
ddf = dd.from_pandas(df, npartitions=1000)

ddf = ddf.set_index('user')

counts = ddf.map_partitions(lambda x: len(x)).compute()
counts.loc[counts > 0]
# 500    1000
# 999    2000
# dtype: int64

但是,我在任何地方都找不到这种行为的保证。

我曾尝试自己筛选代码但放弃了。我相信这些相互关联的函数之一可能包含答案:

当你set_index时,是不是一个索引永远不能在两个不同的分区中?如果不是,那么这个 属性 在什么条件下成立?


赏金:我会将赏金奖励给来自信誉良好的来源的答案。例如,引用实现表明这个属性必须成立。

Is it the case that a single index can never be in two different partitions?

IIUC,出于实际目的,答案是肯定的。

一个 dask 数据框通常会有多个分区,dask 可能知道也可能不知道与每个分区关联的索引值 (see Partitions)。如果 dask 确实知道哪个分区包含哪个索引范围,那么这将反映在 df.divisions 输出中(如果不知道,则此调用的结果将为 None)。

当 运行 .set_index 时,dask 将计算除法,并且在确定除法时似乎要求除法是连续且唯一的(最后一个元素除外)。相关代码为here.

所以有两个潜在的后续问题:为什么不允许任何非顺序索引,以及作为前面的特定情况,为什么不允许分区中的重复索引。

关于第一个问题:对于较小的数据,考虑允许非排序索引的设计可能是可行的,但您可以想象一般的非排序索引不会很好地扩展,因为 dask将需要以某种方式为每个分区存储索引。

关于第二个问题:看起来应该是可以的,但是现在好像也没有正确实现。请参阅下面的代码段:

# use this to generate 10 indexed partitions
import pandas as pd

for user in range(10):
    
    df = pd.DataFrame({'user_col': [user//3]*100})
    df['user'] = df['user_col']
    df = df.set_index('user')
    df.index.name = 'user_index'
    
    df.to_parquet(f'test_{user}.parquet', index=True)


# now load them into a dask dataframe
import dask.dataframe as dd

ddf = dd.read_parquet('test_*.parquet')

# dask will know about the divisions
print(ddf.known_divisions) # True

# further evidence
print(ddf.divisions) # (0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3)

# this should show three partitions, but will show only one
print(ddf.loc[0].npartitions) # 1

我刚刚注意到 Dask 的 shuffle 文档说

After this operation, rows with the same value of on will be in the same partition.

这似乎证实了我的经验观察。

is it the case that a single index can never be in two different partitions?

不,这当然是允许的。达斯克甚至打算让这件事发生。但是,由于 set_index 中的 bug,所有数据仍将在一个分区中结束。

一个极端的例子(每一行除了一个都是相同的值):

In [1]: import dask.dataframe as dd
In [2]: import pandas as pd
In [3]: df = pd.DataFrame({"A": [0] + [1] * 20})
In [4]: ddf = dd.from_pandas(df, npartitions=10)
In [5]: s = ddf.set_index("A")
In [6]: s.divisions
Out[6]: (0, 0, 0, 0, 0, 0, 0, 1)

如您所见,Dask 打算将 0 拆分为多个分区。然而,当洗牌真正发生时,所有 0s 仍然在一个分区中结束:

In [7]: import dask
In [8]: dask.compute(s.to_delayed())  # easy way to see the partitions separately
Out[8]: 
([Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]],)

这是因为 code deciding which output partition a row belongs doesn't consider duplicates in divisions. Treating divisions as a Series, it uses searchsortedside="right",因此所有数据总是在最后一个分区结束。

问题解决后我会更新这个答案。