在 Dask 中返回数据框

Returning a dataframe in Dask

目标: 加快跨大型数据帧(190 万~行)逐行应用函数的速度

尝试: 使用 dask map_partitions,其中分区 == 核心数。我编写了一个应用于每一行的函数,创建了一个包含可变数量新值(1 到 55 之间)的字典。此功能独立运行良好。

问题:我需要一种方法将每个函数的输出组合成最终数据帧。我尝试使用 df.append,我将每个字典附加到一个新的数据框和 return 这个数据框。如果我了解 Dask Docs,Dask 应该将它们组合成一个大 DF。不幸的是,这条线触发了一个错误(ValueError:无法将输入数组从形状 (56) 广播到形状 (1))。这让我相信它与 Dask 中的组合功能有关?

#Function to applied row wise down the dataframe. Takes a column (post) and new empty df. 
def func(post,New_DF):
    post = str(post)
    scores = OtherFUNC.countWords(post)
    scores['post'] = post
    New_DF = New_DF.append(scores, ignore_index=True)
    return(New_DF)

#Dask 
dd.from_pandas(dataset,npartitions=nCores).\
 map_partitions(
      lambda df : df.apply(
         lambda x : func(x.post,New_DF),axis=1)).\
   compute(get=get)

我不太确定我是否完全理解您的代码而不是 MCVE,但我认为这里存在一些误解。

在这段代码中,您获取一行和一个 DataFrame,并将一行附加到该 DataFrame。

#Function to applied row wise down the dataframe. Takes a column (post) and new empty df. 
def func(post,New_DF):
    post = str(post)
    scores = OtherFUNC.countWords(post)
    scores['post'] = post
    New_DF = New_DF.append(scores, ignore_index=True)
    return(New_DF)

与其附加到 New_DF,我建议只 returning 一个 pd.Seriesdf.apply 连接成一个 DataFrame。这是因为如果您在所有 nCores 分区中追加到同一个 New_DF 对象,您肯定会 运行 陷入困境。

 #Function to applied row wise down the dataframe. Takes a row and returns a row. 
def tobsecret_func(row):
    post = str(row.post)
    scores = OtherFUNC.countWords(post)
    scores['post'] = post
    length_adjusted_series = pd.Series(scores).reindex(range(55))
    return(length_adjusted_series)

您的错误还表明,正如您在问题中所写,您的函数创建了可变数量的值。如果 pd.Series 你 return 没有相同的形状和列名,那么 df.apply 将无法将它们连接成 pd.DataFrame。因此,请确保每次 return 一个 pd.Series 形状相同。这个问题向您展示了如何创建长度和索引相等的 pd.Series

我不知道你的 OtherFUNC.countWords return 到底是哪种 dict,所以你可能需要调整行: length_adjusted_series = pd.Series(scores).reindex(range(55))

照原样,该行将 return 具有索引 0、1、2、...、54 和最多 55 个值的系列(如果字典最初的键少于 55 个,则剩余的单元格将包含 NaN 个值)。 这意味着在应用于 DataFrame 之后,该 DataFrame 的列将被命名为 0、1、2、...、54。

现在您使用 dataset 并将您的函数映射到每个分区,并在每个分区中使用 apply 将其应用到 DataFrame

#Dask 
dd.from_pandas(dataset,npartitions=nCores).\
 map_partitions(
      lambda df : df.apply(
         lambda x : func(x.post,New_DF),axis=1)).\
   compute(get=get)

map_partitions 需要一个将 DataFrame 作为输入并输出 DataFrame 的函数。您的函数通过使用 lambda 函数来执行此操作,该函数基本上调用您的其他函数并将其应用于 DataFrame,而 DataFrame 又 return 是一个 DataFrame。这行得通,但我强烈建议编写一个命名函数,它将一个 DataFrame 作为输入并输出一个 DataFrame,它使您更容易调试代码。

例如,使用像这样的简单包装函数:

df_wise(df):
    return df.apply(tobsecret_func)

特别是当您的代码变得越来越复杂时,请避免使用 lambda 调用非平凡代码的函数,例如您的自定义 func,而是创建一个简单的命名函数可以帮助您进行调试,因为回溯不仅会像您的代码中那样引导您进入包含一堆 lambda 函数的行,还会直接指向命名函数 df_wise,因此您会确切地看到错误的来源。

#Dask 
dd.from_pandas(dataset,npartitions=nCores).\
 map_partitions(df_wise, 
                meta=df_wise(dd.head())
                ).\
   compute(get=get)

请注意,我们刚刚将 dd.head() 馈送到 df_wise 以创建我们的元关键字,这类似于 Dask 在幕后所做的事情。

您正在使用 dask.get,同步调度程序,这就是为什么整个 New_DF.append(...) 代码可以工作的原因,因为您为每个连续的分区附加到 DataFrame。

这不会给你任何并行性,因此如果你使用其他调度器之一,所有这些调度器都会并行化你的代码,那么它将不起作用。

documentation 还提到了 meta 关键字参数,您应该将其提供给 map_partitions 调用,以便 dask 知道您的 DataFrame 将包含哪些列。如果你不这样做,dask 将首先必须在其中一个分区上对你的函数进行试验 运行 并检查输出的形状,然后才能继续执行其他分区。如果您的分区很大,这会大大降低您的代码速度;给出 meta 关键字可以绕过这个不必要的 dask 计算。