Spark:如何转换 LabeledPoint 中某些选定特征的值?
Spark: How to transform values of some selected features in LabeledPoint?
我有一个 LabeledPoint
和我要转换的功能列表:
scala> transformedData.collect()
res29: Array[org.apache.spark.mllib.regression.LabeledPoint] = Array((0.0,(400036,[7744],[2.0])), (0.0,(400036,[7744,8608],[3.0,3.0])), (0.0,(400036,[7744],[2.0])), (0.0,(400036,[133,218,2162,7460,7744,9567],[1.0,1.0,2.0,1.0,42.0,21.0])), (0.0,(400036,[133,218,1589,2162,2784,2922,3274,6914,7008,7131,7460,8608,9437,9567,199999,200021,200035,200048,200051,200056,200058,200064,200070,200072,200075,200087,400008,400011],[4.0,1.0,6.0,53.0,6.0,1.0,1.0,2.0,11.0,17.0,48.0,3.0,4.0,113.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,28.0,1.0,1.0,1.0,1.0,1.0,4.0])), (0.0,(400036,[1589,3585,4830,6935,6936,7744,400008,400011],[2.0,6.0,3.0,52.0,4.0,3.0,1.0,2.0])), (0.0,(400036,[1589,2162,2784,2922,4123,7008,7131,7792,8608],[23.0,70.0,1.0,2.0,2.0,1.0,1.0,2.0,2.0])), (0.0,(400036,[4830,6935,6936,400008,400011],[1.0,36.0...
val toTransform = List(124,443,543,211,...
我想使用的转换如下所示:
- 取(特征值+1)的自然对数:
new_val=log(val+1)
- 将新值除以新值的最大值:
new_val/max(new_val)
(如果 max
不等于 0)
如何为我的 toTransform
列表中的每个 feature
执行此转换(我不想创建新功能,只需转换旧功能)
@zero323 是对的,你最好把你的 LabeledPoint
s 弄平然后你可以做以下事情:
// create an UDF to transform
def transform(max: Double) = udf[Double,Double] { c => Math.log1p(c) / max}
// create dummy data
val df = sc.parallelize(Seq(1, 2, 3, 4, 5, 4, 3, 2, 1)).toDF("feature")
// get the max value of the feature
val maxFeat = df.agg(max($"feature")).rdd.map { case r: Row => r.getInt(0) }.max
// apply the transformation on your feature column
val newDf = df.withColumn("norm", transform(maxFeat)($"feature"))
newDF.show
// +-------+-------------------+
// |feature| norm|
// +-------+-------------------+
// | 1|0.13862943611198905|
// | 2|0.21972245773362192|
// | 3| 0.2772588722239781|
// | 4|0.32188758248682003|
// | 5| 0.358351893845611|
// | 4|0.32188758248682003|
// | 3| 0.2772588722239781|
// | 2|0.21972245773362192|
// | 1|0.13862943611198905|
// +-------+-------------------+
这是可能的,但不是很简单。如果您可以在 assemble 向量和标记点之前转换值,那么@eliasah 提供的答案应该可以解决问题。否则,您必须以艰难的方式做事。假设您的数据如下所示
import org.apache.spark.mllib.linalg.{Vector, Vectors, SparseVector, DenseVector}
import org.apache.spark.mllib.regression.LabeledPoint
val points = sc.parallelize(Seq(
LabeledPoint(1.0, Vectors.sparse(6, Array(1, 4, 5), Array(2.0, 6.0, 3.0))),
LabeledPoint(2.0, Vectors.sparse(6, Array(2, 3), Array(0.1, 1.0)))
))
接下来定义小帮手:
import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV}
def toBreeze(v: Vector): BV[Double] = v match {
case DenseVector(values) => new BDV[Double](values)
case SparseVector(size, indices, values) => {
new BSV[Double](indices, values, size)
}
}
和disassemble LabeledPoints
如下:
val pairs = points.map(lp => (lp.label, toBreeze(lp.features)))
现在可以定义一个转换函数:
def transform(indices: Seq[Int])(v: BV[Double]) = {
for(i <- indices) v(i) = breeze.numerics.log(v(i) + 1.0)
v
}
和变换对:
val indices = Array(2, 4)
val transformed = pairs.mapValues(transform(indices))
最后让我们找出最大值:
val maxV = transformed.values.reduce(breeze.linalg.max(_, _))
def divideByMax(m: BV[Double], indices: Seq[Int])(v: BV[Double]) = {
for (i <- indices) if(m(i) != 0) v(i) /= m(i)
v
}
val divided = transformed.mapValues(divideByMax(maxV, indices))
并映射到 LabelPoints
:
def toSpark(v: BV[Double]) = v match {
case v: BDV[Double] => new DenseVector(v.toArray)
case v: BSV[Double] => new SparseVector(v.length, v.index, v.data)
}
divided.map{case (l, v) => LabeledPoint(l, toSpark(v))}
我有一个 LabeledPoint
和我要转换的功能列表:
scala> transformedData.collect()
res29: Array[org.apache.spark.mllib.regression.LabeledPoint] = Array((0.0,(400036,[7744],[2.0])), (0.0,(400036,[7744,8608],[3.0,3.0])), (0.0,(400036,[7744],[2.0])), (0.0,(400036,[133,218,2162,7460,7744,9567],[1.0,1.0,2.0,1.0,42.0,21.0])), (0.0,(400036,[133,218,1589,2162,2784,2922,3274,6914,7008,7131,7460,8608,9437,9567,199999,200021,200035,200048,200051,200056,200058,200064,200070,200072,200075,200087,400008,400011],[4.0,1.0,6.0,53.0,6.0,1.0,1.0,2.0,11.0,17.0,48.0,3.0,4.0,113.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,28.0,1.0,1.0,1.0,1.0,1.0,4.0])), (0.0,(400036,[1589,3585,4830,6935,6936,7744,400008,400011],[2.0,6.0,3.0,52.0,4.0,3.0,1.0,2.0])), (0.0,(400036,[1589,2162,2784,2922,4123,7008,7131,7792,8608],[23.0,70.0,1.0,2.0,2.0,1.0,1.0,2.0,2.0])), (0.0,(400036,[4830,6935,6936,400008,400011],[1.0,36.0...
val toTransform = List(124,443,543,211,...
我想使用的转换如下所示:
- 取(特征值+1)的自然对数:
new_val=log(val+1)
- 将新值除以新值的最大值:
new_val/max(new_val)
(如果max
不等于 0)
如何为我的 toTransform
列表中的每个 feature
执行此转换(我不想创建新功能,只需转换旧功能)
@zero323 是对的,你最好把你的 LabeledPoint
s 弄平然后你可以做以下事情:
// create an UDF to transform
def transform(max: Double) = udf[Double,Double] { c => Math.log1p(c) / max}
// create dummy data
val df = sc.parallelize(Seq(1, 2, 3, 4, 5, 4, 3, 2, 1)).toDF("feature")
// get the max value of the feature
val maxFeat = df.agg(max($"feature")).rdd.map { case r: Row => r.getInt(0) }.max
// apply the transformation on your feature column
val newDf = df.withColumn("norm", transform(maxFeat)($"feature"))
newDF.show
// +-------+-------------------+
// |feature| norm|
// +-------+-------------------+
// | 1|0.13862943611198905|
// | 2|0.21972245773362192|
// | 3| 0.2772588722239781|
// | 4|0.32188758248682003|
// | 5| 0.358351893845611|
// | 4|0.32188758248682003|
// | 3| 0.2772588722239781|
// | 2|0.21972245773362192|
// | 1|0.13862943611198905|
// +-------+-------------------+
这是可能的,但不是很简单。如果您可以在 assemble 向量和标记点之前转换值,那么@eliasah 提供的答案应该可以解决问题。否则,您必须以艰难的方式做事。假设您的数据如下所示
import org.apache.spark.mllib.linalg.{Vector, Vectors, SparseVector, DenseVector}
import org.apache.spark.mllib.regression.LabeledPoint
val points = sc.parallelize(Seq(
LabeledPoint(1.0, Vectors.sparse(6, Array(1, 4, 5), Array(2.0, 6.0, 3.0))),
LabeledPoint(2.0, Vectors.sparse(6, Array(2, 3), Array(0.1, 1.0)))
))
接下来定义小帮手:
import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV}
def toBreeze(v: Vector): BV[Double] = v match {
case DenseVector(values) => new BDV[Double](values)
case SparseVector(size, indices, values) => {
new BSV[Double](indices, values, size)
}
}
和disassemble LabeledPoints
如下:
val pairs = points.map(lp => (lp.label, toBreeze(lp.features)))
现在可以定义一个转换函数:
def transform(indices: Seq[Int])(v: BV[Double]) = {
for(i <- indices) v(i) = breeze.numerics.log(v(i) + 1.0)
v
}
和变换对:
val indices = Array(2, 4)
val transformed = pairs.mapValues(transform(indices))
最后让我们找出最大值:
val maxV = transformed.values.reduce(breeze.linalg.max(_, _))
def divideByMax(m: BV[Double], indices: Seq[Int])(v: BV[Double]) = {
for (i <- indices) if(m(i) != 0) v(i) /= m(i)
v
}
val divided = transformed.mapValues(divideByMax(maxV, indices))
并映射到 LabelPoints
:
def toSpark(v: BV[Double]) = v match {
case v: BDV[Double] => new DenseVector(v.toArray)
case v: BSV[Double] => new SparseVector(v.length, v.index, v.data)
}
divided.map{case (l, v) => LabeledPoint(l, toSpark(v))}