如何将一个函数并行应用于 Dask Data Frame 的多列?
How to apply a function to multiple columns of a Dask Data Frame in parallel?
我有一个 Dask Dataframe,我想为它计算列列表的偏度,如果这个偏度超过某个阈值,我会使用对数转换来纠正它。我想知道是否有更有效的方法通过删除下面 correct_skewness()
函数中的 for 循环 来使 correct_skewness()
函数在多个列上并行工作:
import dask
import dask.array as da
from scipy import stats
# Create a dataframe
df = dask.datasets.timeseries()
df.head()
id name x y
timestamp
2000-01-01 00:00:00 1032 Oliver 0.018604 0.089191
2000-01-01 00:00:01 1032 Norbert 0.666689 -0.979374
2000-01-01 00:00:02 991 Victor 0.027691 -0.474660
2000-01-01 00:00:03 979 Kevin 0.320067 0.656949
2000-01-01 00:00:04 1087 Zelda -0.462076 0.513409
def correct_skewness(columns=None, max_skewness=2):
if columns is None:
raise ValueError(
f"columns argument is None. Please set columns argument to a list of columns"
)
for col in columns:
skewness = stats.skew(df[col])
max_val = df[col].max().compute()
min_val = df[col].min().compute()
if abs(skewness) > max_skewness and (max_val > 1 or min_val < 0):
delta = 1.0
if min_val < 0:
delta = max(1, -min_val + 1)
df[col] = da.log(delta + df[col])
return df
df = correct_skewness(columns=['x', 'y'])
在此示例中,您可以采取一些措施来提高并行度:
您可以使用 dask.array.stats.skew 而不是 statsmodels.skew。您将必须import dask.array.stats
明确
您可以在一次计算中计算所有列的 min/max
mins = [df[col].min() for col in cols]
maxes = [df[col].min() for col in cols]
skews = [da.stats.skew(df[col]) for col in cols]
mins, maxes, skews = dask.compute(mins, maxes, skews)
然后您可以执行您的 if 逻辑并根据需要应用 da.log
。这仍然需要对您的数据进行两次传递,但这应该是对您现在拥有的数据的一个很好的改进。
我有一个 Dask Dataframe,我想为它计算列列表的偏度,如果这个偏度超过某个阈值,我会使用对数转换来纠正它。我想知道是否有更有效的方法通过删除下面 correct_skewness()
函数中的 for 循环 来使 correct_skewness()
函数在多个列上并行工作:
import dask
import dask.array as da
from scipy import stats
# Create a dataframe
df = dask.datasets.timeseries()
df.head()
id name x y
timestamp
2000-01-01 00:00:00 1032 Oliver 0.018604 0.089191
2000-01-01 00:00:01 1032 Norbert 0.666689 -0.979374
2000-01-01 00:00:02 991 Victor 0.027691 -0.474660
2000-01-01 00:00:03 979 Kevin 0.320067 0.656949
2000-01-01 00:00:04 1087 Zelda -0.462076 0.513409
def correct_skewness(columns=None, max_skewness=2):
if columns is None:
raise ValueError(
f"columns argument is None. Please set columns argument to a list of columns"
)
for col in columns:
skewness = stats.skew(df[col])
max_val = df[col].max().compute()
min_val = df[col].min().compute()
if abs(skewness) > max_skewness and (max_val > 1 or min_val < 0):
delta = 1.0
if min_val < 0:
delta = max(1, -min_val + 1)
df[col] = da.log(delta + df[col])
return df
df = correct_skewness(columns=['x', 'y'])
在此示例中,您可以采取一些措施来提高并行度:
您可以使用 dask.array.stats.skew 而不是 statsmodels.skew。您将必须import dask.array.stats
明确
您可以在一次计算中计算所有列的 min/max
mins = [df[col].min() for col in cols]
maxes = [df[col].min() for col in cols]
skews = [da.stats.skew(df[col]) for col in cols]
mins, maxes, skews = dask.compute(mins, maxes, skews)
然后您可以执行您的 if 逻辑并根据需要应用 da.log
。这仍然需要对您的数据进行两次传递,但这应该是对您现在拥有的数据的一个很好的改进。