Dask DataFrame 过滤器和重新分区给出了一些空分区
Dask DataFrame filter and repartition gives some empty partitions
我正在尝试过滤一个 Dask DataFrame
,然后使用 map_partitions
对每个分区应用一个函数。该函数需要一个至少有 1 行的 pandas DataFrame
。
这是为 MCVE
生成一些虚拟数据作为 pandas
DataFrame
(然后转换为 Dask DataFrame
)的代码
def create_data(n):
df = pd.DataFrame(np.random.rand(6 * n), columns=["A"])
random_integers = np.random.default_rng().choice(14, size=n, replace=False)
df.insert(0, 'store_id', [d for s in random_integers for d in [s] * 6])
return df
df = create_data(n=10)
print(df.head(15))
>>>
store_id A
0 10 0.850730
1 10 0.581119
2 10 0.825802
3 10 0.657797
4 10 0.291961
5 10 0.864984
6 9 0.161334
7 9 0.397162
8 9 0.089300
9 9 0.435914
10 9 0.750741
11 9 0.920625
12 3 0.635727
13 3 0.425270
14 3 0.904043
数据结构:对于每个 store_id
,恰好有 6 行。
现在我创建了一个包含一些 store_id
的列表,我想用它来过滤上述数据
filtered_store_ids = df["store_id"].value_counts().index[:6].tolist()
print(filtered_store_ids)
>>> [13, 12, 11, 10, 9, 7]
然后我把上面的数据(一个pandasDataFrame
)转换成一个dask.dataframe
ddf = dd.from_pandas(df, npartitions=10)
现在我打印 ddf
的分区
for p in range(ddf.npartitions):
print(f"Partition Index={p}, Number of Rows={len(ddf.get_partition(p))}")
>>>
Partition Index=0, Number of Rows=6
Partition Index=1, Number of Rows=6
Partition Index=2, Number of Rows=6
Partition Index=3, Number of Rows=6
Partition Index=4, Number of Rows=6
Partition Index=5, Number of Rows=6
Partition Index=6, Number of Rows=6
Partition Index=7, Number of Rows=6
Partition Index=8, Number of Rows=6
Partition Index=9, Number of Rows=6
这是意料之中的。每个分区有 6 行和一个(唯一)store_id
。因此,每个分区包含单个 store_id
.
的数据
我现在使用上面的 store_id
列表过滤 Dask 数据帧
ddf = ddf[ddf["store_id"].isin(filtered_store_ids)]
我再次打印过滤后的分区 ddf
for p in range(ddf.npartitions):
print(f"Partition Index={p}, Number of Rows={len(ddf.get_partition(p))}")
>>>
Partition Index=0, Number of Rows=0
Partition Index=1, Number of Rows=0
Partition Index=2, Number of Rows=6
Partition Index=3, Number of Rows=6
Partition Index=4, Number of Rows=0
Partition Index=5, Number of Rows=6
Partition Index=6, Number of Rows=6
Partition Index=7, Number of Rows=6
Partition Index=8, Number of Rows=0
Partition Index=9, Number of Rows=6
这是预期的,因为每个分区都有一个 store_id
,并且通过过滤,一些分区将被完全过滤掉,因此它们将包含零行。
所以,现在我将根据 Dask DataFrame best practices
对过滤后的 Dataframe
重新分区
ddf = ddf.repartition(npartitions=len(filtered_store_ids))
print(ddf)
>>>
Dask DataFrame Structure:
store_id A
npartitions=6
0 int64 float64
6 ... ...
... ... ...
48 ... ...
59 ... ...
Dask Name: repartition, 47 tasks
我预计这次重新分区操作只会产生大小均匀的非空分区。 但是,现在当我重新打印分区时,我得到了与前一个类似的输出(分区大小不均匀和一些空分区),就好像重新分区没有发生一样
for p in range(ddf.npartitions):
print(f"Partition Index={p}, Number of Rows={len(ddf.get_partition(p))}")
>>>
Partition Index=0, Number of Rows=0
Partition Index=1, Number of Rows=6
Partition Index=2, Number of Rows=6
Partition Index=3, Number of Rows=6
Partition Index=4, Number of Rows=12
Partition Index=5, Number of Rows=6
我的下一步是在过滤后对每个分区应用一个函数,但这不会起作用,因为有些分区 (pandas DataFrame
s) 该函数无法按原样处理缺少行。
def myadd(df):
assert df.shape[0] > 0
...
return ...
ddf.map_partitions(myadd)
>>> AssertionError Traceback (most recent call last)
.
.
.
AssertionError:
用于重新分区的 Dask 文档是 well-explained(与我上面链接的最佳实践相同),它看起来很简单,但是在重新分区之后,我仍然得到一些零行的分区map_partitions
会在这里失败。我确定我在这里遗漏了一些东西。
有一些关于重新分区的 SO 帖子 (1, ),但它们不处理空分区。
问题
有没有办法保证重新分区后,所有分区都重新有6行,没有空分区?即是否有可能重新分区的 Dask DataFrame
具有相同大小的(非空)分区?
编辑
我从 SO
找到了两个现有帖子
- 使用
删除空分区
- 使用
_rebalance_ddf()
重新平衡以获得均匀的分区大小
- 警告 - 此函数需要计算
我使用它们来解决这个问题。
从问题的原始代码开始(无需更改)
.
<identical code from question here>
.
ddf = ddf.repartition(npartitions=len(filtered_store_ids))
接下来,我就在重新分区的上面连续调用这两个函数了ddf
ddf = cull_empty_partitions(ddf) # remove empties
ddf = _rebalance_ddf(ddf) # re-size
当我现在重新打印分区大小时,所有分区大小均匀,none 为空
for p in range(ddf.npartitions):
print(f"Partition Index={p}, Number of Rows={len(ddf.get_partition(p))}")
>>>
Partition Index=0, Number of Rows=6
Partition Index=1, Number of Rows=6
Partition Index=2, Number of Rows=6
Partition Index=3, Number of Rows=6
Partition Index=4, Number of Rows=6
Partition Index=5, Number of Rows=6
我正在尝试过滤一个 Dask DataFrame
,然后使用 map_partitions
对每个分区应用一个函数。该函数需要一个至少有 1 行的 pandas DataFrame
。
这是为 MCVE
生成一些虚拟数据作为pandas
DataFrame
(然后转换为 Dask DataFrame
)的代码
def create_data(n):
df = pd.DataFrame(np.random.rand(6 * n), columns=["A"])
random_integers = np.random.default_rng().choice(14, size=n, replace=False)
df.insert(0, 'store_id', [d for s in random_integers for d in [s] * 6])
return df
df = create_data(n=10)
print(df.head(15))
>>>
store_id A
0 10 0.850730
1 10 0.581119
2 10 0.825802
3 10 0.657797
4 10 0.291961
5 10 0.864984
6 9 0.161334
7 9 0.397162
8 9 0.089300
9 9 0.435914
10 9 0.750741
11 9 0.920625
12 3 0.635727
13 3 0.425270
14 3 0.904043
数据结构:对于每个 store_id
,恰好有 6 行。
现在我创建了一个包含一些 store_id
的列表,我想用它来过滤上述数据
filtered_store_ids = df["store_id"].value_counts().index[:6].tolist()
print(filtered_store_ids)
>>> [13, 12, 11, 10, 9, 7]
然后我把上面的数据(一个pandasDataFrame
)转换成一个dask.dataframe
ddf = dd.from_pandas(df, npartitions=10)
现在我打印 ddf
for p in range(ddf.npartitions):
print(f"Partition Index={p}, Number of Rows={len(ddf.get_partition(p))}")
>>>
Partition Index=0, Number of Rows=6
Partition Index=1, Number of Rows=6
Partition Index=2, Number of Rows=6
Partition Index=3, Number of Rows=6
Partition Index=4, Number of Rows=6
Partition Index=5, Number of Rows=6
Partition Index=6, Number of Rows=6
Partition Index=7, Number of Rows=6
Partition Index=8, Number of Rows=6
Partition Index=9, Number of Rows=6
这是意料之中的。每个分区有 6 行和一个(唯一)store_id
。因此,每个分区包含单个 store_id
.
我现在使用上面的 store_id
列表过滤 Dask 数据帧
ddf = ddf[ddf["store_id"].isin(filtered_store_ids)]
我再次打印过滤后的分区 ddf
for p in range(ddf.npartitions):
print(f"Partition Index={p}, Number of Rows={len(ddf.get_partition(p))}")
>>>
Partition Index=0, Number of Rows=0
Partition Index=1, Number of Rows=0
Partition Index=2, Number of Rows=6
Partition Index=3, Number of Rows=6
Partition Index=4, Number of Rows=0
Partition Index=5, Number of Rows=6
Partition Index=6, Number of Rows=6
Partition Index=7, Number of Rows=6
Partition Index=8, Number of Rows=0
Partition Index=9, Number of Rows=6
这是预期的,因为每个分区都有一个 store_id
,并且通过过滤,一些分区将被完全过滤掉,因此它们将包含零行。
所以,现在我将根据 Dask DataFrame best practices
对过滤后的Dataframe
重新分区
ddf = ddf.repartition(npartitions=len(filtered_store_ids))
print(ddf)
>>>
Dask DataFrame Structure:
store_id A
npartitions=6
0 int64 float64
6 ... ...
... ... ...
48 ... ...
59 ... ...
Dask Name: repartition, 47 tasks
我预计这次重新分区操作只会产生大小均匀的非空分区。 但是,现在当我重新打印分区时,我得到了与前一个类似的输出(分区大小不均匀和一些空分区),就好像重新分区没有发生一样
for p in range(ddf.npartitions):
print(f"Partition Index={p}, Number of Rows={len(ddf.get_partition(p))}")
>>>
Partition Index=0, Number of Rows=0
Partition Index=1, Number of Rows=6
Partition Index=2, Number of Rows=6
Partition Index=3, Number of Rows=6
Partition Index=4, Number of Rows=12
Partition Index=5, Number of Rows=6
我的下一步是在过滤后对每个分区应用一个函数,但这不会起作用,因为有些分区 (pandas DataFrame
s) 该函数无法按原样处理缺少行。
def myadd(df):
assert df.shape[0] > 0
...
return ...
ddf.map_partitions(myadd)
>>> AssertionError Traceback (most recent call last)
.
.
.
AssertionError:
用于重新分区的 Dask 文档是 well-explained(与我上面链接的最佳实践相同),它看起来很简单,但是在重新分区之后,我仍然得到一些零行的分区map_partitions
会在这里失败。我确定我在这里遗漏了一些东西。
有一些关于重新分区的 SO 帖子 (1,
问题
有没有办法保证重新分区后,所有分区都重新有6行,没有空分区?即是否有可能重新分区的 Dask DataFrame
具有相同大小的(非空)分区?
编辑
我从 SO
找到了两个现有帖子- 使用
- 使用
_rebalance_ddf()
重新平衡以获得均匀的分区大小- 警告 - 此函数需要计算
我使用它们来解决这个问题。
从问题的原始代码开始(无需更改)
.
<identical code from question here>
.
ddf = ddf.repartition(npartitions=len(filtered_store_ids))
接下来,我就在重新分区的上面连续调用这两个函数了ddf
ddf = cull_empty_partitions(ddf) # remove empties
ddf = _rebalance_ddf(ddf) # re-size
当我现在重新打印分区大小时,所有分区大小均匀,none 为空
for p in range(ddf.npartitions):
print(f"Partition Index={p}, Number of Rows={len(ddf.get_partition(p))}")
>>>
Partition Index=0, Number of Rows=6
Partition Index=1, Number of Rows=6
Partition Index=2, Number of Rows=6
Partition Index=3, Number of Rows=6
Partition Index=4, Number of Rows=6
Partition Index=5, Number of Rows=6