Pyspark 标准定标器 - 排除均值计算的空值
Pyspark standard scaler - excluding null values for mean calculation
我正在尝试将 standardScaler for sparkML 库用于包含空值列的数据框。我想保留空值,但是当我将标准缩放器与均值一起使用时,具有空值的列的均值也变为空值。有什么方法可以使标准缩放器跳过均值计算的空值(如矢量汇编器中的 handleInvalid 选项)?
下面是代码示例
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
sqlContext = SparkSession.builder.appName('test').config("spark.submit.deployMode","client").enableHiveSupport().getOrCreate()
test_df = sqlContext.createDataFrame([(1,2,None),(1,3,3),(1,4,8),(1,5,7),(1,6,8),
(1,7,1),(1,8,6),(1,9,9),(1,10,3),(1,11,12)],schema=['col1','col2','col3'])
#%%
from pyspark.ml.feature import StringIndexer,VectorIndexer,VectorAssembler,StandardScaler
from pyspark.ml import Pipeline,PipelineModel
assmbler = VectorAssembler(inputCols=['col2','col3'],outputCol='col_vec',handleInvalid='keep')
sclr = StandardScaler(withMean=True,inputCol='col_vec',outputCol='col_scaled')
pipeline = Pipeline(stages=[assmbler,sclr])
pipe_fit= pipeline.fit(test_df)
df_res = pipe_fit.transform(test_df)
在此之后,如果我尝试获取平均值。
pipe_fit.stages[1].mean
Out[5]: DenseVector([6.5, nan])
如您所见,第二列的平均值为 nan。有什么办法可以避免这种情况?
Spark的StandardScaler的fit method使用Summarizer.metrics("mean", "std")
计算列的均值:
val Row(mean: Vector, std: Vector) = dataset
.select(Summarizer.metrics("mean", "std").summary(col($(inputCol))).as("summary"))
.select("summary.mean", "summary.std")
.first()
Summarizer class 本身没有选项可以忽略 null
或 NaN
/None
值,因此没有针对该问题的内置解决方案。
有几种方法可以解决这个问题:
在安装管道之前过滤掉 None 值
test_df = test_df.filter("not col2 is null and not col3 is null")
用常数值替换缺失值
test_df = test_df.fillna(0) #or any other value that is appropriate for the task
使用输入法
向管道添加一个 Imputer 以用特征的均值、中值或最频繁的值替换缺失值:
from pyspark.ml.feature import Imputer
imputer = Imputer(inputCols=['col2', 'col3'], outputCols=['col2i', 'col3i'])
assmbler = VectorAssembler(inputCols=['col2i','col3i'],outputCol='col_vec',handleInvalid='keep')
sclr = StandardScaler(withMean=True,inputCol='col_vec',outputCol='col_scaled')
pipeline = Pipeline(stages=[imputer,assmbler,sclr])
pipe_fit= pipeline.fit(test_df)
df_res = pipe_fit.transform(test_df)
pipe_fit.stages[2].mean
现在returns
DenseVector([6.5, 6.3])
因为 col3
中的缺失值已替换为该列的平均值。
对于strategy parameter,可以使用中位数或最常见的值来代替均值,但使用均值是默认值。
在没有 StandardScaler 的情况下缩放所需的列
使用标准 Spark SQL 函数 mean and stddev 可以实现与 StandardScaler 类似的逻辑。 SQL 函数都可以很好地处理 None
值。
cols = ['col2', 'col3'] # the columns that should be scaled
mean_and_std_cols=[c for col in cols for c in
(F.mean(col).alias(f"{col}_mean"),F.stddev(col).alias(f"{col}_std"))]
mean_and_std = test_df.select(mean_and_std_cols).first()
scaled_cols=[((F.col(col) - mean_and_std[f"{col}_mean"])
/mean_and_std[f"{col}_std"]).alias(f"{col}_s") for col in cols]
test_df = test_df.select(test_df.columns + scaled_cols)
此逻辑将 col2_s
和 col3_s
两列添加到包含缩放值的数据框中。 mean_and_std
包含平均值和标准差的实际值:
Row(col2_mean=6.5, col2_std=3.0276503540974917, col3_mean=6.333333333333333, col3_std=3.4641016151377544)
新创建的列 col2_s
和 col3_s
现在可以用作 VectorAssembler 的输入列:
assmbler = VectorAssembler(inputCols=['col2_s','col3_s'],outputCol='col_vec',handleInvalid='keep')
pipeline = Pipeline(stages=[assmbler])
pipe_fit= ...
此选项在大型数据集上可能比原始缩放器慢一点,因为均值和标准差的值不是近似值,而是精确计算的。
我正在尝试将 standardScaler for sparkML 库用于包含空值列的数据框。我想保留空值,但是当我将标准缩放器与均值一起使用时,具有空值的列的均值也变为空值。有什么方法可以使标准缩放器跳过均值计算的空值(如矢量汇编器中的 handleInvalid 选项)?
下面是代码示例
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
sqlContext = SparkSession.builder.appName('test').config("spark.submit.deployMode","client").enableHiveSupport().getOrCreate()
test_df = sqlContext.createDataFrame([(1,2,None),(1,3,3),(1,4,8),(1,5,7),(1,6,8),
(1,7,1),(1,8,6),(1,9,9),(1,10,3),(1,11,12)],schema=['col1','col2','col3'])
#%%
from pyspark.ml.feature import StringIndexer,VectorIndexer,VectorAssembler,StandardScaler
from pyspark.ml import Pipeline,PipelineModel
assmbler = VectorAssembler(inputCols=['col2','col3'],outputCol='col_vec',handleInvalid='keep')
sclr = StandardScaler(withMean=True,inputCol='col_vec',outputCol='col_scaled')
pipeline = Pipeline(stages=[assmbler,sclr])
pipe_fit= pipeline.fit(test_df)
df_res = pipe_fit.transform(test_df)
在此之后,如果我尝试获取平均值。
pipe_fit.stages[1].mean
Out[5]: DenseVector([6.5, nan])
如您所见,第二列的平均值为 nan。有什么办法可以避免这种情况?
Spark的StandardScaler的fit method使用Summarizer.metrics("mean", "std")
计算列的均值:
val Row(mean: Vector, std: Vector) = dataset
.select(Summarizer.metrics("mean", "std").summary(col($(inputCol))).as("summary"))
.select("summary.mean", "summary.std")
.first()
Summarizer class 本身没有选项可以忽略 null
或 NaN
/None
值,因此没有针对该问题的内置解决方案。
有几种方法可以解决这个问题:
在安装管道之前过滤掉 None 值
test_df = test_df.filter("not col2 is null and not col3 is null")
用常数值替换缺失值
test_df = test_df.fillna(0) #or any other value that is appropriate for the task
使用输入法
向管道添加一个 Imputer 以用特征的均值、中值或最频繁的值替换缺失值:
from pyspark.ml.feature import Imputer
imputer = Imputer(inputCols=['col2', 'col3'], outputCols=['col2i', 'col3i'])
assmbler = VectorAssembler(inputCols=['col2i','col3i'],outputCol='col_vec',handleInvalid='keep')
sclr = StandardScaler(withMean=True,inputCol='col_vec',outputCol='col_scaled')
pipeline = Pipeline(stages=[imputer,assmbler,sclr])
pipe_fit= pipeline.fit(test_df)
df_res = pipe_fit.transform(test_df)
pipe_fit.stages[2].mean
现在returns
DenseVector([6.5, 6.3])
因为 col3
中的缺失值已替换为该列的平均值。
对于strategy parameter,可以使用中位数或最常见的值来代替均值,但使用均值是默认值。
在没有 StandardScaler 的情况下缩放所需的列
使用标准 Spark SQL 函数 mean and stddev 可以实现与 StandardScaler 类似的逻辑。 SQL 函数都可以很好地处理 None
值。
cols = ['col2', 'col3'] # the columns that should be scaled
mean_and_std_cols=[c for col in cols for c in
(F.mean(col).alias(f"{col}_mean"),F.stddev(col).alias(f"{col}_std"))]
mean_and_std = test_df.select(mean_and_std_cols).first()
scaled_cols=[((F.col(col) - mean_and_std[f"{col}_mean"])
/mean_and_std[f"{col}_std"]).alias(f"{col}_s") for col in cols]
test_df = test_df.select(test_df.columns + scaled_cols)
此逻辑将 col2_s
和 col3_s
两列添加到包含缩放值的数据框中。 mean_and_std
包含平均值和标准差的实际值:
Row(col2_mean=6.5, col2_std=3.0276503540974917, col3_mean=6.333333333333333, col3_std=3.4641016151377544)
新创建的列 col2_s
和 col3_s
现在可以用作 VectorAssembler 的输入列:
assmbler = VectorAssembler(inputCols=['col2_s','col3_s'],outputCol='col_vec',handleInvalid='keep')
pipeline = Pipeline(stages=[assmbler])
pipe_fit= ...
此选项在大型数据集上可能比原始缩放器慢一点,因为均值和标准差的值不是近似值,而是精确计算的。