如何在列的子集上实施 PySpark StandardScaler?

How to implement PySpark StandardScaler on subset of columns?

我想在数据框中的 10 列中的 6 列上使用 pyspark StandardScaler。这将是管道的一部分。

inputCol 参数似乎需要一个向量,我可以在对我的所有特征使用 VectorAssembler 后传入该向量,但这会缩放所有 10 个特征。我不想缩放其他 4 个特征,因为它们是二进制的,我想要它们的非标准化系数。

我是否应该在 6 个特征上使用向量组装器,缩放它们,然后在这个缩放后的特征向量和其余 4 个特征上再次使用向量组装器?我最终会得到一个向量中的向量,但我不确定这是否可行。

正确的做法是什么?举个例子表示赞赏。

您可以使用 VectorAssembler 来完成此操作。它们的关键是您必须从汇编器输出中提取列。有关工作示例,请参见下面的代码,

from pyspark.ml.feature import MinMaxScaler, StandardScaler
from pyspark.ml.feature import VectorAssembler
import pandas as pd
import numpy as np
import random

df = pd.DataFrame()
df['a'] = random.sample(range(100), 10)
df['b'] = random.sample(range(100), 10)
df['c'] = random.sample(range(100), 10)
df['d'] = random.sample(range(100), 10)
df['e'] = random.sample(range(100), 10)

sdf = sc.createDataFrame(df)

sdf.show()

+---+---+---+---+---+
|  a|  b|  c|  d|  e|
+---+---+---+---+---+
| 51| 13|  6|  5| 26|
| 18| 29| 19| 81| 28|
| 34|  1| 36| 57| 87|
| 56| 86| 51| 52| 48|
| 36| 49| 33| 15| 54|
| 87| 53| 47| 89| 85|
|  7| 14| 55| 13| 98|
| 70| 50| 32| 39| 58|
| 80| 20| 25| 54| 37|
| 40| 33| 44| 83| 27|
+---+---+---+---+---+

cols_to_scale = ['c', 'd', 'e']
cols_to_keep_unscaled = ['a', 'b']

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
assembler = VectorAssembler().setInputCols(cols_to_scale).setOutputCol("features")
sdf_transformed = assembler.transform(sdf)
scaler_model = scaler.fit(sdf_transformed.select("features"))
sdf_scaled = scaler_model.transform(sdf_transformed)

sdf_scaled.show()

+---+---+---+---+---+----------------+--------------------+
|  a|  b|  c|  d|  e|        features|      scaledFeatures|
+---+---+---+---+---+----------------+--------------------+
| 51| 13|  6|  5| 26|  [6.0,5.0,26.0]|[0.39358015146628...|
| 18| 29| 19| 81| 28|[19.0,81.0,28.0]|[1.24633714630991...|
| 34|  1| 36| 57| 87|[36.0,57.0,87.0]|[2.36148090879773...|
| 56| 86| 51| 52| 48|[51.0,52.0,48.0]|[3.34543128746345...|
| 36| 49| 33| 15| 54|[33.0,15.0,54.0]|[2.16469083306459...|
| 87| 53| 47| 89| 85|[47.0,89.0,85.0]|[3.08304451981926...|
|  7| 14| 55| 13| 98|[55.0,13.0,98.0]|[3.60781805510765...|
| 70| 50| 32| 39| 58|[32.0,39.0,58.0]|[2.09909414115354...|
| 80| 20| 25| 54| 37|[25.0,54.0,37.0]|[1.63991729777620...|
| 40| 33| 44| 83| 27|[44.0,83.0,27.0]|[2.88625444408612...|
+---+---+---+---+---+----------------+--------------------+

# Function just to convert to help build data frame
def extract(row):
  return (row.a, row.b,) + tuple(row.scaledFeatures.toArray().tolist())

sdf_scaled = sdf_scaled.select(*cols_to_keep_unscaled, "scaledFeatures").rdd \
        .map(extract).toDF(cols_to_keep_unscaled + cols_to_scale)
  
  
sdf_scaled.show()


+---+---+------------------+-------------------+------------------+
|  a|  b|                 c|                  d|                 e|
+---+---+------------------+-------------------+------------------+
| 51| 13|0.3935801514662892|0.16399957083190683|0.9667572801316145|
| 18| 29| 1.246337146309916|  2.656793047476891|1.0411232247571234|
| 34|  1|2.3614809087977355| 1.8695951074837378|3.2349185912096337|
| 56| 86|3.3454312874634584| 1.7055955366518312|1.7847826710122114|
| 36| 49| 2.164690833064591|0.49199871249572047| 2.007880504888738|
| 87| 53| 3.083044519819266| 2.9191923608079415|3.1605526465841245|
|  7| 14|3.6078180551076513| 0.4263988841629578| 3.643931286649932|
| 70| 50|2.0990941411535426| 1.2791966524888734|2.1566123941397555|
| 80| 20| 1.639917297776205| 1.7711953649845937| 1.375769975571913|
| 40| 33|2.8862544440861213| 2.7223928758096534| 1.003940252444369|
+---+---+------------------+-------------------+------------------+