如何在 Pandas/Dask 中按具有可变 bin 的列对大型数据帧进行离散化
How to discretize large dataframe by columns with variable bins in Pandas/Dask
我可以使用以下代码按列离散化 Pandas 数据帧:
import numpy as np
import pandas as pd
def discretize(X, n_scale=1):
for c in X.columns:
loc = X[c].median()
# median absolute deviation of the column
scale = mad(X[c])
bins = [-np.inf, loc - (scale * n_scale),
loc + (scale * n_scale), np.inf]
X[c] = pd.cut(X[c], bins, labels=[-1, 0, 1])
return X
我想使用以下参数离散化每一列:loc(列的中位数)和 scale(列的 median absolute deviation)。
对于小数据帧,所需时间是可以接受的(因为它是单线程解决方案)。
但是,对于更大的数据帧,我想利用更多线程(或进程)来加速计算。
我不是Dask方面的专家,应该可以解决这个问题
然而,在我的例子中,离散化应该是可行的代码:
import dask.dataframe as dd
import numpy as np
import pandas as pd
def discretize(X, n_scale=1):
# I'm using only 2 partitions for this example
X_dask = dd.from_pandas(X, npartitions=2)
# FIXME:
# how can I define bins to compute loc and scale
# for each column?
bins = [-np.inf, loc - (scale * n_scale),
loc + (scale * n_scale), np.inf]
X = X_dask.apply(pd.cut, axis=1, args=(bins,), labels=[-1, 0, 1]).compute()
return X
但这里的问题是 loc
和 scale
取决于列值,因此应该在应用之前或期间为每一列计算它们。
如何做到?
我从未使用过 dask
,但我想您可以定义一个新函数以在 apply
中使用。
import dask.dataframe as dd
import multiprocessing as mp
import numpy as np
import pandas as pd
def discretize(X, n_scale=1):
X_dask = dd.from_pandas(X.T, npartitions=mp.cpu_count()+1)
X = X_dask.apply(_discretize_series,
axis=1, args=(n_scale,),
columns=X.columns).compute().T
return X
def _discretize_series(x, n_scale=1):
loc = x.median()
scale = mad(x)
bins = [-np.inf, loc - (scale * n_scale),
loc + (scale * n_scale), np.inf]
x = pd.cut(x, bins, labels=[-1, 0, 1])
return x
我可以使用以下代码按列离散化 Pandas 数据帧:
import numpy as np
import pandas as pd
def discretize(X, n_scale=1):
for c in X.columns:
loc = X[c].median()
# median absolute deviation of the column
scale = mad(X[c])
bins = [-np.inf, loc - (scale * n_scale),
loc + (scale * n_scale), np.inf]
X[c] = pd.cut(X[c], bins, labels=[-1, 0, 1])
return X
我想使用以下参数离散化每一列:loc(列的中位数)和 scale(列的 median absolute deviation)。
对于小数据帧,所需时间是可以接受的(因为它是单线程解决方案)。
但是,对于更大的数据帧,我想利用更多线程(或进程)来加速计算。
我不是Dask方面的专家,应该可以解决这个问题
然而,在我的例子中,离散化应该是可行的代码:
import dask.dataframe as dd
import numpy as np
import pandas as pd
def discretize(X, n_scale=1):
# I'm using only 2 partitions for this example
X_dask = dd.from_pandas(X, npartitions=2)
# FIXME:
# how can I define bins to compute loc and scale
# for each column?
bins = [-np.inf, loc - (scale * n_scale),
loc + (scale * n_scale), np.inf]
X = X_dask.apply(pd.cut, axis=1, args=(bins,), labels=[-1, 0, 1]).compute()
return X
但这里的问题是 loc
和 scale
取决于列值,因此应该在应用之前或期间为每一列计算它们。
如何做到?
我从未使用过 dask
,但我想您可以定义一个新函数以在 apply
中使用。
import dask.dataframe as dd
import multiprocessing as mp
import numpy as np
import pandas as pd
def discretize(X, n_scale=1):
X_dask = dd.from_pandas(X.T, npartitions=mp.cpu_count()+1)
X = X_dask.apply(_discretize_series,
axis=1, args=(n_scale,),
columns=X.columns).compute().T
return X
def _discretize_series(x, n_scale=1):
loc = x.median()
scale = mad(x)
bins = [-np.inf, loc - (scale * n_scale),
loc + (scale * n_scale), np.inf]
x = pd.cut(x, bins, labels=[-1, 0, 1])
return x