python dask DataFrame,支持(普通可并行化)行应用?
python dask DataFrame, support for (trivially parallelizable) row apply?
我最近发现了 dask 模块,旨在成为一个易于使用的 python 并行处理模块。对我来说最大的卖点是它适用于 pandas.
在其手册页上阅读了一些之后,我找不到一种方法来完成这个简单的可并行化任务:
ts.apply(func) # for pandas series
df.apply(func, axis = 1) # for pandas DF row apply
目前,要迅速实现这一目标,据我所知,
ddf.assign(A=lambda df: df.apply(func, axis=1)).compute() # dask DataFrame
这是一种丑陋的语法,实际上比直接
慢
df.apply(func, axis = 1) # for pandas DF row apply
有什么建议吗?
编辑:感谢@MRocklin 的地图功能。它似乎比普通 pandas apply 慢。这与 pandas GIL 释放问题有关还是我做错了?
import dask.dataframe as dd
s = pd.Series([10000]*120)
ds = dd.from_pandas(s, npartitions = 3)
def slow_func(k):
A = np.random.normal(size = k) # k = 10000
s = 0
for a in A:
if a > 0:
s += 1
else:
s -= 1
return s
s.apply(slow_func) # 0.43 sec
ds.map(slow_func).compute() # 2.04 sec
map_partitions
您可以使用 map_partitions
函数将函数应用于数据框的所有分区。
df.map_partitions(func, columns=...)
请注意,一次只会给 func 数据集的一部分,而不是像 pandas apply
那样的整个数据集(如果你想进行并行处理,你可能不想要。)
map
/ apply
您可以使用 map
将函数逐行映射到一个序列中
df.mycolumn.map(func)
您可以使用 apply
在数据框中按行映射函数
df.apply(func, axis=1)
线程与进程
从版本 0.6.0 开始,dask.dataframes
与线程并行化。自定义 Python 函数不会从基于线程的并行性中获得太多好处。您可以尝试使用流程
df = dd.read_csv(...)
df.map_partitions(func, columns=...).compute(scheduler='processes')
但避免apply
但是,在 Pandas 和 Dask 中,您真的应该避免 apply
使用自定义 Python 函数。这通常是性能不佳的根源。可能是,如果您找到一种以矢量化方式进行操作的方法,那么您的 Pandas 代码可能会快 100 倍,您根本不需要 dask.dataframe。
考虑numba
对于您的特定问题,您可以考虑 numba
。这会显着提高您的表现。
In [1]: import numpy as np
In [2]: import pandas as pd
In [3]: s = pd.Series([10000]*120)
In [4]: %paste
def slow_func(k):
A = np.random.normal(size = k) # k = 10000
s = 0
for a in A:
if a > 0:
s += 1
else:
s -= 1
return s
## -- End pasted text --
In [5]: %time _ = s.apply(slow_func)
CPU times: user 345 ms, sys: 3.28 ms, total: 348 ms
Wall time: 347 ms
In [6]: import numba
In [7]: fast_func = numba.jit(slow_func)
In [8]: %time _ = s.apply(fast_func) # First time incurs compilation overhead
CPU times: user 179 ms, sys: 0 ns, total: 179 ms
Wall time: 175 ms
In [9]: %time _ = s.apply(fast_func) # Subsequent times are all gain
CPU times: user 68.8 ms, sys: 27 µs, total: 68.8 ms
Wall time: 68.7 ms
免责声明,我在同时生产 numba
和 dask
的公司工作,并雇用了许多 pandas
开发人员。
从 v dask.dataframe
开始,将责任委托给 map_partitions
:
@insert_meta_param_description(pad=12)
def apply(self, func, convert_dtype=True, meta=no_default, args=(), **kwds):
""" Parallel version of pandas.Series.apply
...
"""
if meta is no_default:
msg = ("`meta` is not specified, inferred from partial data. "
"Please provide `meta` if the result is unexpected.\n"
" Before: .apply(func)\n"
" After: .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result\n"
" or: .apply(func, meta=('x', 'f8')) for series result")
warnings.warn(msg)
meta = _emulate(M.apply, self._meta_nonempty, func,
convert_dtype=convert_dtype,
args=args, **kwds)
return map_partitions(M.apply, self, func,
convert_dtype, args, meta=meta, **kwds)
我最近发现了 dask 模块,旨在成为一个易于使用的 python 并行处理模块。对我来说最大的卖点是它适用于 pandas.
在其手册页上阅读了一些之后,我找不到一种方法来完成这个简单的可并行化任务:
ts.apply(func) # for pandas series
df.apply(func, axis = 1) # for pandas DF row apply
目前,要迅速实现这一目标,据我所知,
ddf.assign(A=lambda df: df.apply(func, axis=1)).compute() # dask DataFrame
这是一种丑陋的语法,实际上比直接
慢df.apply(func, axis = 1) # for pandas DF row apply
有什么建议吗?
编辑:感谢@MRocklin 的地图功能。它似乎比普通 pandas apply 慢。这与 pandas GIL 释放问题有关还是我做错了?
import dask.dataframe as dd
s = pd.Series([10000]*120)
ds = dd.from_pandas(s, npartitions = 3)
def slow_func(k):
A = np.random.normal(size = k) # k = 10000
s = 0
for a in A:
if a > 0:
s += 1
else:
s -= 1
return s
s.apply(slow_func) # 0.43 sec
ds.map(slow_func).compute() # 2.04 sec
map_partitions
您可以使用 map_partitions
函数将函数应用于数据框的所有分区。
df.map_partitions(func, columns=...)
请注意,一次只会给 func 数据集的一部分,而不是像 pandas apply
那样的整个数据集(如果你想进行并行处理,你可能不想要。)
map
/ apply
您可以使用 map
df.mycolumn.map(func)
您可以使用 apply
df.apply(func, axis=1)
线程与进程
从版本 0.6.0 开始,dask.dataframes
与线程并行化。自定义 Python 函数不会从基于线程的并行性中获得太多好处。您可以尝试使用流程
df = dd.read_csv(...)
df.map_partitions(func, columns=...).compute(scheduler='processes')
但避免apply
但是,在 Pandas 和 Dask 中,您真的应该避免 apply
使用自定义 Python 函数。这通常是性能不佳的根源。可能是,如果您找到一种以矢量化方式进行操作的方法,那么您的 Pandas 代码可能会快 100 倍,您根本不需要 dask.dataframe。
考虑numba
对于您的特定问题,您可以考虑 numba
。这会显着提高您的表现。
In [1]: import numpy as np
In [2]: import pandas as pd
In [3]: s = pd.Series([10000]*120)
In [4]: %paste
def slow_func(k):
A = np.random.normal(size = k) # k = 10000
s = 0
for a in A:
if a > 0:
s += 1
else:
s -= 1
return s
## -- End pasted text --
In [5]: %time _ = s.apply(slow_func)
CPU times: user 345 ms, sys: 3.28 ms, total: 348 ms
Wall time: 347 ms
In [6]: import numba
In [7]: fast_func = numba.jit(slow_func)
In [8]: %time _ = s.apply(fast_func) # First time incurs compilation overhead
CPU times: user 179 ms, sys: 0 ns, total: 179 ms
Wall time: 175 ms
In [9]: %time _ = s.apply(fast_func) # Subsequent times are all gain
CPU times: user 68.8 ms, sys: 27 µs, total: 68.8 ms
Wall time: 68.7 ms
免责声明,我在同时生产 numba
和 dask
的公司工作,并雇用了许多 pandas
开发人员。
从 v dask.dataframe
开始,将责任委托给 map_partitions
:
@insert_meta_param_description(pad=12)
def apply(self, func, convert_dtype=True, meta=no_default, args=(), **kwds):
""" Parallel version of pandas.Series.apply
...
"""
if meta is no_default:
msg = ("`meta` is not specified, inferred from partial data. "
"Please provide `meta` if the result is unexpected.\n"
" Before: .apply(func)\n"
" After: .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result\n"
" or: .apply(func, meta=('x', 'f8')) for series result")
warnings.warn(msg)
meta = _emulate(M.apply, self._meta_nonempty, func,
convert_dtype=convert_dtype,
args=args, **kwds)
return map_partitions(M.apply, self, func,
convert_dtype, args, meta=meta, **kwds)