Pyspark 标准定标器 - 排除均值计算的空值

Pyspark standard scaler - excluding null values for mean calculation

我正在尝试将 standardScaler for sparkML 库用于包含空值列的数据框。我想保留空值,但是当我将标准缩放器与均值一起使用时,具有空值的列的均值也变为空值。有什么方法可以使标准缩放器跳过均值计算的空值(如矢量汇编器中的 handleInvalid 选项)?

下面是代码示例

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
sqlContext = SparkSession.builder.appName('test').config("spark.submit.deployMode","client").enableHiveSupport().getOrCreate() 

test_df = sqlContext.createDataFrame([(1,2,None),(1,3,3),(1,4,8),(1,5,7),(1,6,8),
                                  (1,7,1),(1,8,6),(1,9,9),(1,10,3),(1,11,12)],schema=['col1','col2','col3'])
#%%

from pyspark.ml.feature import StringIndexer,VectorIndexer,VectorAssembler,StandardScaler
from pyspark.ml import Pipeline,PipelineModel

assmbler  = VectorAssembler(inputCols=['col2','col3'],outputCol='col_vec',handleInvalid='keep')
sclr = StandardScaler(withMean=True,inputCol='col_vec',outputCol='col_scaled')
pipeline = Pipeline(stages=[assmbler,sclr])
pipe_fit= pipeline.fit(test_df)
df_res = pipe_fit.transform(test_df)

在此之后,如果我尝试获取平均值。

pipe_fit.stages[1].mean
Out[5]: DenseVector([6.5, nan])

如您所见,第二列的平均值为 nan。有什么办法可以避免这种情况?

Spark的StandardScaler的fit method使用Summarizer.metrics("mean", "std")计算列的均值:

val Row(mean: Vector, std: Vector) = dataset
  .select(Summarizer.metrics("mean", "std").summary(col($(inputCol))).as("summary"))
  .select("summary.mean", "summary.std")
  .first()

Summarizer class 本身没有选项可以忽略 nullNaN/None 值,因此没有针对该问题的内置解决方案。

有几种方法可以解决这个问题:

在安装管道之前过滤掉 None 值

test_df = test_df.filter("not col2 is null and not col3 is null")

用常数值替换缺失值

test_df = test_df.fillna(0) #or any other value that is appropriate for the task

使用输入法

向管道添加一个 Imputer 以用特征的均值、中值或最频繁的值替换缺失值:

from pyspark.ml.feature import Imputer
imputer = Imputer(inputCols=['col2', 'col3'], outputCols=['col2i', 'col3i'])
assmbler = VectorAssembler(inputCols=['col2i','col3i'],outputCol='col_vec',handleInvalid='keep')
sclr = StandardScaler(withMean=True,inputCol='col_vec',outputCol='col_scaled')
pipeline = Pipeline(stages=[imputer,assmbler,sclr])
pipe_fit= pipeline.fit(test_df)
df_res = pipe_fit.transform(test_df)

pipe_fit.stages[2].mean现在returns

DenseVector([6.5, 6.3])

因为 col3 中的缺失值已替换为该列的平均值。

对于strategy parameter,可以使用中位数或最常见的值来代替均值,但使用均值是默认值。

在没有 StandardScaler 的情况下缩放所需的列

使用标准 Spark SQL 函数 mean and stddev 可以实现与 StandardScaler 类似的逻辑。 SQL 函数都可以很好地处理 None 值。

cols = ['col2', 'col3'] # the columns that should be scaled
mean_and_std_cols=[c for col in cols for c in 
    (F.mean(col).alias(f"{col}_mean"),F.stddev(col).alias(f"{col}_std"))]
mean_and_std = test_df.select(mean_and_std_cols).first()
scaled_cols=[((F.col(col) - mean_and_std[f"{col}_mean"])
    /mean_and_std[f"{col}_std"]).alias(f"{col}_s") for col in cols]
test_df = test_df.select(test_df.columns + scaled_cols)

此逻辑将 col2_scol3_s 两列添加到包含缩放值的数据框中。 mean_and_std 包含平均值和标准差的实际值:

Row(col2_mean=6.5, col2_std=3.0276503540974917, col3_mean=6.333333333333333, col3_std=3.4641016151377544)

新创建的列 col2_scol3_s 现在可以用作 VectorAssembler 的输入列:

assmbler  = VectorAssembler(inputCols=['col2_s','col3_s'],outputCol='col_vec',handleInvalid='keep')
pipeline = Pipeline(stages=[assmbler])
pipe_fit= ...

此选项在大型数据集上可能比原始缩放器慢一点,因为均值和标准差的值不是近似值,而是精确计算的。