Scala 中的 MinMax 归一化

MinMax Normalization in scala

我有一个包含多列的 org.apache.spark.sql.DataFrame。我想使用 MinMax 归一化或任何技术缩放 1 列 (lat_long_dist) 以在 -1 和 1 之间缩放数据并将数据类型保留为 org.apache.spark.sql.DataFrame

scala> val df = sqlContext.csvFile("tenop.csv")
df: org.apache.spark.sql.DataFrame = [gst_id_matched: string,
  ip_crowding: string, lat_long_dist: double, stream_name_1: string]

我找到了 StandardScaler 选项,但这需要先转换数据集,然后才能用一种简单干净的方法 transformation.Is。

我猜你想要的是这样的

import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{min, max, lit}

val df = sc.parallelize(Seq(
  (1L, 0.5), (2L, 10.2), (3L, 5.7), (4L, -11.0), (5L, 22.3)
)).toDF("k", "v")

val (vMin, vMax) = df.agg(min($"v"), max($"v")).first match {
  case Row(x: Double, y: Double) => (x, y)
}

val scaledRange = lit(2) // Range of the scaled variable
val scaledMin = lit(-1)  // Min value of the scaled variable
val vNormalized = ($"v" - vMin) / (vMax - vMin) // v normalized to (0, 1) range

val vScaled = scaledRange * vNormalized + scaledMin

df.withColumn("vScaled", vScaled).show

// +---+-----+--------------------+
// |  k|    v|             vScaled|
// +---+-----+--------------------+
// |  1|  0.5| -0.3093093093093092|
// |  2| 10.2| 0.27327327327327344|
// |  3|  5.7|0.003003003003003...|
// |  4|-11.0|                -1.0|
// |  5| 22.3|                 1.0|
// +---+-----+--------------------+

当您已经在使用 Spark 时,这是另一个建议。

为什么不在ml包中使用MinMaxScaler?

让我们用 zero323 中的相同示例来试试这个。

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.sql.functions.udf

val df = sc.parallelize(Seq(
  (1L, 0.5), (2L, 10.2), (3L, 5.7), (4L, -11.0), (5L, 22.3)
)).toDF("k", "v")

//val df.map(r => Vectors.dense(Array(r.getAs[Double]("v"))))

val vectorizeCol = udf( (v:Double) => Vectors.dense(Array(v)) )
val df2 = df.withColumn("vVec", vectorizeCol(df("v"))

val scaler = new MinMaxScaler()
    .setInputCol("vVec")
    .setOutputCol("vScaled")
    .setMax(1)
    .setMin(-1)

scaler.fit(df2).transform(df2).show
+---+-----+-------+--------------------+
|  k|    v|   vVec|             vScaled|
+---+-----+-------+--------------------+
|  1|  0.5|  [0.5]|[-0.3093093093093...|
|  2| 10.2| [10.2]|[0.27327327327327...|
|  3|  5.7|  [5.7]|[0.00300300300300...|
|  4|-11.0|[-11.0]|              [-1.0]|
|  5| 22.3| [22.3]|               [1.0]|
+---+-----+-------+--------------------+

利用一次缩放多列的优势。

val df = sc.parallelize(Seq(
    (1.0, -1.0, 2.0),
    (2.0, 0.0, 0.0),
    (0.0, 1.0, -1.0)
)).toDF("a", "b", "c")

import org.apache.spark.ml.feature.VectorAssembler

val assembler = new VectorAssembler()
    .setInputCols(Array("a", "b", "c"))
    .setOutputCol("features")

val df2 = assembler.transform(df)

// Reusing the scaler instance above with the same min(-1) and max(1) 
scaler.setInputCol("features").setOutputCol("scaledFeatures").fit(df2).transform(df2).show
+---+----+----+--------------+--------------------+
|  a|   b|   c|      features|      scaledFeatures|
+---+----+----+--------------+--------------------+
|1.0|-1.0| 2.0|[1.0,-1.0,2.0]|      [0.0,-1.0,1.0]|
|2.0| 0.0| 0.0| [2.0,0.0,0.0]|[1.0,0.0,-0.33333...|
|0.0| 1.0|-1.0|[0.0,1.0,-1.0]|     [-1.0,1.0,-1.0]|
+---+----+----+--------------+--------------------+

还有一个解决办法。从 Matt、Lyle 和 zero323 获取代码,谢谢!

import org.apache.spark.ml.feature.{MinMaxScaler, VectorAssembler}

val df = sc.parallelize(Seq(
  (1L, 0.5), (2L, 10.2), (3L, 5.7), (4L, -11.0), (5L, 22.3)
)).toDF("k", "v")

val assembler = new VectorAssembler().setInputCols(Array("v")).setOutputCol("vVec")
val df2= assembler.transform(df)

val scaler = new MinMaxScaler().setInputCol("vVec").setOutputCol("vScaled").setMax(1).setMin(-1)

scaler.fit(df2).transform(df2).show

结果:

+---+-----+-------+--------------------+
|  k|    v|   vVec|             vScaled|
+---+-----+-------+--------------------+
|  1|  0.5|  [0.5]|[-0.3093093093093...|
|  2| 10.2| [10.2]|[0.27327327327327...|
|  3|  5.7|  [5.7]|[0.00300300300300...|
|  4|-11.0|[-11.0]|              [-1.0]|
|  5| 22.3| [22.3]|               [1.0]|
+---+-----+-------+--------------------+

顺便说一句:其他解决方案在我这边产生错误

java.lang.IllegalArgumentException: requirement failed: Column vVec must be of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.
  at scala.Predef$.require(Predef.scala:224)
  at org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:43)
  at org.apache.spark.ml.feature.MinMaxScalerParams$class.validateAndTransformSchema(MinMaxScaler.scala:67)
  at org.apache.spark.ml.feature.MinMaxScaler.validateAndTransformSchema(MinMaxScaler.scala:93)
  at org.apache.spark.ml.feature.MinMaxScaler.transformSchema(MinMaxScaler.scala:129)
  at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:74)
  at org.apache.spark.ml.feature.MinMaxScaler.fit(MinMaxScaler.scala:119)
  ... 50 elided

非常感谢!