在 Dask 中返回数据框
Returning a dataframe in Dask
目标: 加快跨大型数据帧(190 万~行)逐行应用函数的速度
尝试: 使用 dask map_partitions,其中分区 == 核心数。我编写了一个应用于每一行的函数,创建了一个包含可变数量新值(1 到 55 之间)的字典。此功能独立运行良好。
问题:我需要一种方法将每个函数的输出组合成最终数据帧。我尝试使用 df.append,我将每个字典附加到一个新的数据框和 return 这个数据框。如果我了解 Dask Docs,Dask 应该将它们组合成一个大 DF。不幸的是,这条线触发了一个错误(ValueError:无法将输入数组从形状 (56) 广播到形状 (1))。这让我相信它与 Dask 中的组合功能有关?
#Function to applied row wise down the dataframe. Takes a column (post) and new empty df.
def func(post,New_DF):
post = str(post)
scores = OtherFUNC.countWords(post)
scores['post'] = post
New_DF = New_DF.append(scores, ignore_index=True)
return(New_DF)
#Dask
dd.from_pandas(dataset,npartitions=nCores).\
map_partitions(
lambda df : df.apply(
lambda x : func(x.post,New_DF),axis=1)).\
compute(get=get)
我不太确定我是否完全理解您的代码而不是 MCVE,但我认为这里存在一些误解。
在这段代码中,您获取一行和一个 DataFrame,并将一行附加到该 DataFrame。
#Function to applied row wise down the dataframe. Takes a column (post) and new empty df.
def func(post,New_DF):
post = str(post)
scores = OtherFUNC.countWords(post)
scores['post'] = post
New_DF = New_DF.append(scores, ignore_index=True)
return(New_DF)
与其附加到 New_DF
,我建议只 returning 一个 pd.Series
,df.apply
连接成一个 DataFrame
。这是因为如果您在所有 nCores
分区中追加到同一个 New_DF
对象,您肯定会 运行 陷入困境。
#Function to applied row wise down the dataframe. Takes a row and returns a row.
def tobsecret_func(row):
post = str(row.post)
scores = OtherFUNC.countWords(post)
scores['post'] = post
length_adjusted_series = pd.Series(scores).reindex(range(55))
return(length_adjusted_series)
您的错误还表明,正如您在问题中所写,您的函数创建了可变数量的值。如果 pd.Series
你 return 没有相同的形状和列名,那么 df.apply
将无法将它们连接成 pd.DataFrame
。因此,请确保每次 return 一个 pd.Series
形状相同。这个问题向您展示了如何创建长度和索引相等的 pd.Series
:
我不知道你的 OtherFUNC.countWords
return 到底是哪种 dict
,所以你可能需要调整行:
length_adjusted_series = pd.Series(scores).reindex(range(55))
照原样,该行将 return 具有索引 0、1、2、...、54 和最多 55 个值的系列(如果字典最初的键少于 55 个,则剩余的单元格将包含 NaN
个值)。
这意味着在应用于 DataFrame
之后,该 DataFrame 的列将被命名为 0、1、2、...、54。
现在您使用 dataset
并将您的函数映射到每个分区,并在每个分区中使用 apply
将其应用到 DataFrame
。
#Dask
dd.from_pandas(dataset,npartitions=nCores).\
map_partitions(
lambda df : df.apply(
lambda x : func(x.post,New_DF),axis=1)).\
compute(get=get)
map_partitions
需要一个将 DataFrame 作为输入并输出 DataFrame 的函数。您的函数通过使用 lambda 函数来执行此操作,该函数基本上调用您的其他函数并将其应用于 DataFrame,而 DataFrame 又 return 是一个 DataFrame。这行得通,但我强烈建议编写一个命名函数,它将一个 DataFrame 作为输入并输出一个 DataFrame,它使您更容易调试代码。
例如,使用像这样的简单包装函数:
df_wise(df):
return df.apply(tobsecret_func)
特别是当您的代码变得越来越复杂时,请避免使用 lambda
调用非平凡代码的函数,例如您的自定义 func
,而是创建一个简单的命名函数可以帮助您进行调试,因为回溯不仅会像您的代码中那样引导您进入包含一堆 lambda 函数的行,还会直接指向命名函数 df_wise
,因此您会确切地看到错误的来源。
#Dask
dd.from_pandas(dataset,npartitions=nCores).\
map_partitions(df_wise,
meta=df_wise(dd.head())
).\
compute(get=get)
请注意,我们刚刚将 dd.head()
馈送到 df_wise
以创建我们的元关键字,这类似于 Dask 在幕后所做的事情。
您正在使用 dask.get,同步调度程序,这就是为什么整个 New_DF.append(...) 代码可以工作的原因,因为您为每个连续的分区附加到 DataFrame。
这不会给你任何并行性,因此如果你使用其他调度器之一,所有这些调度器都会并行化你的代码,那么它将不起作用。
documentation 还提到了 meta
关键字参数,您应该将其提供给 map_partitions
调用,以便 dask 知道您的 DataFrame 将包含哪些列。如果你不这样做,dask 将首先必须在其中一个分区上对你的函数进行试验 运行 并检查输出的形状,然后才能继续执行其他分区。如果您的分区很大,这会大大降低您的代码速度;给出 meta
关键字可以绕过这个不必要的 dask 计算。
目标: 加快跨大型数据帧(190 万~行)逐行应用函数的速度
尝试: 使用 dask map_partitions,其中分区 == 核心数。我编写了一个应用于每一行的函数,创建了一个包含可变数量新值(1 到 55 之间)的字典。此功能独立运行良好。
问题:我需要一种方法将每个函数的输出组合成最终数据帧。我尝试使用 df.append,我将每个字典附加到一个新的数据框和 return 这个数据框。如果我了解 Dask Docs,Dask 应该将它们组合成一个大 DF。不幸的是,这条线触发了一个错误(ValueError:无法将输入数组从形状 (56) 广播到形状 (1))。这让我相信它与 Dask 中的组合功能有关?
#Function to applied row wise down the dataframe. Takes a column (post) and new empty df.
def func(post,New_DF):
post = str(post)
scores = OtherFUNC.countWords(post)
scores['post'] = post
New_DF = New_DF.append(scores, ignore_index=True)
return(New_DF)
#Dask
dd.from_pandas(dataset,npartitions=nCores).\
map_partitions(
lambda df : df.apply(
lambda x : func(x.post,New_DF),axis=1)).\
compute(get=get)
我不太确定我是否完全理解您的代码而不是 MCVE,但我认为这里存在一些误解。
在这段代码中,您获取一行和一个 DataFrame,并将一行附加到该 DataFrame。
#Function to applied row wise down the dataframe. Takes a column (post) and new empty df.
def func(post,New_DF):
post = str(post)
scores = OtherFUNC.countWords(post)
scores['post'] = post
New_DF = New_DF.append(scores, ignore_index=True)
return(New_DF)
与其附加到 New_DF
,我建议只 returning 一个 pd.Series
,df.apply
连接成一个 DataFrame
。这是因为如果您在所有 nCores
分区中追加到同一个 New_DF
对象,您肯定会 运行 陷入困境。
#Function to applied row wise down the dataframe. Takes a row and returns a row.
def tobsecret_func(row):
post = str(row.post)
scores = OtherFUNC.countWords(post)
scores['post'] = post
length_adjusted_series = pd.Series(scores).reindex(range(55))
return(length_adjusted_series)
您的错误还表明,正如您在问题中所写,您的函数创建了可变数量的值。如果 pd.Series
你 return 没有相同的形状和列名,那么 df.apply
将无法将它们连接成 pd.DataFrame
。因此,请确保每次 return 一个 pd.Series
形状相同。这个问题向您展示了如何创建长度和索引相等的 pd.Series
:
我不知道你的 OtherFUNC.countWords
return 到底是哪种 dict
,所以你可能需要调整行:
length_adjusted_series = pd.Series(scores).reindex(range(55))
照原样,该行将 return 具有索引 0、1、2、...、54 和最多 55 个值的系列(如果字典最初的键少于 55 个,则剩余的单元格将包含 NaN
个值)。
这意味着在应用于 DataFrame
之后,该 DataFrame 的列将被命名为 0、1、2、...、54。
现在您使用 dataset
并将您的函数映射到每个分区,并在每个分区中使用 apply
将其应用到 DataFrame
。
#Dask
dd.from_pandas(dataset,npartitions=nCores).\
map_partitions(
lambda df : df.apply(
lambda x : func(x.post,New_DF),axis=1)).\
compute(get=get)
map_partitions
需要一个将 DataFrame 作为输入并输出 DataFrame 的函数。您的函数通过使用 lambda 函数来执行此操作,该函数基本上调用您的其他函数并将其应用于 DataFrame,而 DataFrame 又 return 是一个 DataFrame。这行得通,但我强烈建议编写一个命名函数,它将一个 DataFrame 作为输入并输出一个 DataFrame,它使您更容易调试代码。
例如,使用像这样的简单包装函数:
df_wise(df):
return df.apply(tobsecret_func)
特别是当您的代码变得越来越复杂时,请避免使用 lambda
调用非平凡代码的函数,例如您的自定义 func
,而是创建一个简单的命名函数可以帮助您进行调试,因为回溯不仅会像您的代码中那样引导您进入包含一堆 lambda 函数的行,还会直接指向命名函数 df_wise
,因此您会确切地看到错误的来源。
#Dask
dd.from_pandas(dataset,npartitions=nCores).\
map_partitions(df_wise,
meta=df_wise(dd.head())
).\
compute(get=get)
请注意,我们刚刚将 dd.head()
馈送到 df_wise
以创建我们的元关键字,这类似于 Dask 在幕后所做的事情。
您正在使用 dask.get,同步调度程序,这就是为什么整个 New_DF.append(...) 代码可以工作的原因,因为您为每个连续的分区附加到 DataFrame。
这不会给你任何并行性,因此如果你使用其他调度器之一,所有这些调度器都会并行化你的代码,那么它将不起作用。
documentation 还提到了 meta
关键字参数,您应该将其提供给 map_partitions
调用,以便 dask 知道您的 DataFrame 将包含哪些列。如果你不这样做,dask 将首先必须在其中一个分区上对你的函数进行试验 运行 并检查输出的形状,然后才能继续执行其他分区。如果您的分区很大,这会大大降低您的代码速度;给出 meta
关键字可以绕过这个不必要的 dask 计算。