如何在 Dask 中 sort_index、groupby 和应用函数?
How do I sort_index, groupby and apply a function in Dask?
我有一个应用于 pandas 数据帧的函数,我正在考虑使用 dask 来提高性能
这是我现有的代码:
df.reset_index(
level=0,
inplace=True,
)
df = df.sort_index().groupby(
['col1', 'col2', 'col3'],
as_index=False).apply(
myfunction
)
我正在尝试将其转换为 dask 语法并设法达到:
from dask import dataframe as dd
from multiprocessing import cpu_count
nCores = cpu_count()
df = dd.from_pandas(
df,
npartitions=nCores
).reset_index().set_index().groupby(
['col1', 'col2', 'col3']
).apply(
myfunction
).compute()
看来您只能将一列传递给 set_index
并且在 dask 中没有 sort_index()
的等价物。我如何用 dask 语法编写此 pandas 代码?
对于任何对类似解决方案感兴趣的人,这里是 dask 语法中有效的代码版本,请注意我在将索引传递给 dask 之前对索引进行了排序。
from dask import dataframe as dd
from multiprocessing import cpu_count
nCores = cpu_count()
df.sort_index(inplace=True)
df = dd.from_pandas(
df,
npartitions=nCores
).map_partitions(
lambda df : df.groupby(
['col1', 'col2', 'col3']
).apply(
my_function
)
).compute()
重要的是,dask 版本并不比 pandas 方法快;但非常接近。 my_function
在这种情况下被矢量化并主要使用 numpy 数组应用于每个 groupby 对象。
我有一个应用于 pandas 数据帧的函数,我正在考虑使用 dask 来提高性能
这是我现有的代码:
df.reset_index(
level=0,
inplace=True,
)
df = df.sort_index().groupby(
['col1', 'col2', 'col3'],
as_index=False).apply(
myfunction
)
我正在尝试将其转换为 dask 语法并设法达到:
from dask import dataframe as dd
from multiprocessing import cpu_count
nCores = cpu_count()
df = dd.from_pandas(
df,
npartitions=nCores
).reset_index().set_index().groupby(
['col1', 'col2', 'col3']
).apply(
myfunction
).compute()
看来您只能将一列传递给 set_index
并且在 dask 中没有 sort_index()
的等价物。我如何用 dask 语法编写此 pandas 代码?
对于任何对类似解决方案感兴趣的人,这里是 dask 语法中有效的代码版本,请注意我在将索引传递给 dask 之前对索引进行了排序。
from dask import dataframe as dd
from multiprocessing import cpu_count
nCores = cpu_count()
df.sort_index(inplace=True)
df = dd.from_pandas(
df,
npartitions=nCores
).map_partitions(
lambda df : df.groupby(
['col1', 'col2', 'col3']
).apply(
my_function
)
).compute()
重要的是,dask 版本并不比 pandas 方法快;但非常接近。 my_function
在这种情况下被矢量化并主要使用 numpy 数组应用于每个 groupby 对象。