删除 Dask 中的空分区

Remove empty partitions in Dask

从 CSV 加载数据时,无法加载某些 CSV,导致分区为空。我想删除所有空分区,因为某些方法似乎不适用于空分区。我已尝试重新分区,其中(例如)repartition(npartitions=10) 有效,但大于此值仍会导致空分区。

实现此目标的最佳方法是什么?谢谢

没有简单的API可以做到这一点。您可以调用 df.map_partitions(len) 来确定哪些分区是空的,然后显式删除它们,也许可以使用 df.to_delayed()dask.dataframe.from_delayed(...).

将来,如果您愿意在发现无法很好地处理空分区的函数时提出问题,我们将不胜感激。 https://github.com/dask/dask/issues/new

我发现按日期过滤 Dask 数据帧通常会导致空分区。如果您在使用带有空分区的数据帧时遇到问题,这里有一个基于 MRocklin 指导的函数来剔除它们:

def cull_empty_partitions(df):
    ll = list(df.map_partitions(len).compute())
    df_delayed = df.to_delayed()
    df_delayed_new = list()
    pempty = None
    for ix, n in enumerate(ll):
        if 0 == n:
            pempty = df.get_partition(ix)
        else:
            df_delayed_new.append(df_delayed[ix])
    if pempty is not None:
        df = dd.from_delayed(df_delayed_new, meta=pempty)
    return df

对于任何使用 Bags(不是 DataFrames)的人来说,这个函数都可以解决问题:

def cull_empty_partitions(bag):
    """
    When bags are created by filtering or grouping from a different bag,
    it retains the original bag's partition count, even if a lot of the
    partitions become empty.
    Those extra partitions add overhead, so it's nice to discard them.
    This function drops the empty partitions.
    """
    bag = bag.persist()
    def get_len(partition):
        # If the bag is the result of bag.filter(),
        # then each partition is actually a 'filter' object,
        # which has no __len__.
        # In that case, we must convert it to a list first.
        if hasattr(partition, '__len__'):
            return len(partition)
        return len(list(partition))
    partition_lengths = bag.map_partitions(get_len).compute()

    # Convert bag partitions into a list of 'delayed' objects
    lengths_and_partitions = zip(partition_lengths, bag.to_delayed())

    # Drop the ones with empty partitions
    partitions = (p for l,p in lengths_and_partitions if l > 0)

    # Convert from list of delayed objects back into a Bag.
    return dask.bag.from_delayed(partitions)

这是我删除空分区的尝试:

import numpy as np

def remove_empty_partitions(ddf):
    """ remove empty partitions """
    partition_lens = ddf.map_partitions(len).compute()
    ids_of_empty_partitions = np.where(partition_lens==0)
    if len(ids_of_empty_partitions) == len(partition_lens):
        # all partitions are empty
        ddf_nonzero = ddf.partitions[0]
    elif len(ids_of_empty_partitions)>0:
        ddf_nonzero = dd.concat([
            ddf.get_partition(num_partition) for num_partition, partition in enumerate(ddf.partitions)
            if num_partition not in ids_of_empty_partitions
        ])
    return ddf_nonzero

FWIW,@tpegbert 的回答似乎在获取过滤后的数据帧所需的任务数量方面更有效率。