如何在多列上使用 spark quantilediscretizer

How to use spark quantilediscretizer on multiple columns

全部,

我有如下的 ml 管道设置

import org.apache.spark.ml.feature.QuantileDiscretizer
import org.apache.spark.sql.types.{StructType,StructField,DoubleType}    
import org.apache.spark.ml.Pipeline
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import scala.util.Random

val nRows = 10000
val nCols = 1000
val data = sc.parallelize(0 to nRows-1).map { _ => Row.fromSeq(Seq.fill(nCols)(Random.nextDouble)) }
val schema = StructType((0 to nCols-1).map { i => StructField("C" + i, DoubleType, true) } )
val df = spark.createDataFrame(data, schema)
df.cache()

//Get continuous feature name and discretize them
val continuous = df.dtypes.filter(_._2 == "DoubleType").map (_._1)
val discretizers = continuous.map(c => new QuantileDiscretizer().setInputCol(c).setOutputCol(s"${c}_disc").setNumBuckets(3).fit(df))
val pipeline = new Pipeline().setStages(discretizers)
val model = pipeline.fit(df)

当我 运行 这样做时,spark 似乎将每个离散化器设置为单独的工作。有没有办法 运行 所有的离散化器作为一个单一的工作有或没有管道? 感谢帮助,感激不尽

import org.apache.spark.ml.feature.QuantileDiscretizer

val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
val df = spark.createDataFrame(data).toDF("id", "hour")

val discretizer = new QuantileDiscretizer()
.setInputCol("hour")
.setOutputCol("result")
.setNumBuckets(3)

val result = discretizer.fit(df).transform(df)
result.show()

取自quantilediscretizer

它作为单个作业针对单个列运行,下面它也作为单个作业运行但针对多个列:

def discretizerFun (col: String, bucketNo: Int): 
 org.apache.spark.ml.feature.QuantileDiscretizer = {

val discretizer = new QuantileDiscretizer()

discretizer
.setInputCol(col)
.setOutputCol(s"${col}_result")
.setNumBuckets(bucketNo)
}


val data = Array((0, 18.0, 2.1), (1, 19.0, 14.1), (2, 8.0, 63.7), (3, 5.0, 
88.3), (4, 2.2, 0.8))

val df = spark.createDataFrame(data).toDF("id", "hour", "temp")

val res = discretizerFun("temp", 4).fit(discretizerFun("hour", 2).fit(df).transform(df)).transform(discretizerFun("hour", 2).fit(df).transform(df))

最好的方法是将该函数转换为 udf,但这可能是处理 org.apache.spark.ml.feature.QuantileDiscretizer - type 的问题,如果可以的话,那么你会很高兴进行惰性转换的简洁方法

Spark 2.3.0 中添加了对此功能的支持。 See release docs

  • 多个特征转换器的多列支持:
    • [SPARK-13​​030]:OneHotEncoderEstimator (Scala/Java/Python)
    • [SPARK-22397]:QuantileDiscretizer (Scala/Java)
    • [SPARK-20542]:分桶器(Scala/Java/Python)

您现在可以使用setInputColssetOutputCols 来指定多个列,尽管它似乎还没有反映在官方文档中。与一次处理每一列一个作业相比,这个新补丁的性能得到了极大的提高。

您的示例可以修改如下:

import org.apache.spark.ml.feature.QuantileDiscretizer
import org.apache.spark.sql.types.{StructType,StructField,DoubleType}    
import org.apache.spark.ml.Pipeline
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import scala.util.Random

val nRows = 10000
val nCols = 1000
val data = sc.parallelize(0 to nRows-1).map { _ => Row.fromSeq(Seq.fill(nCols)(Random.nextDouble)) }
val schema = StructType((0 to nCols-1).map { i => StructField("C" + i, DoubleType, true) } )
val df = spark.createDataFrame(data, schema)
df.cache()

//Get continuous feature name and discretize them
val continuous = df.dtypes.filter(_._2 == "DoubleType").map (_._1)

val discretizer = new QuantileDiscretizer()
  .setInputCols(continuous)
  .setOutputCols(continuous.map(c => s"${c}_disc"))
  .setNumBuckets(3)

val pipeline = new Pipeline().setStages(Array(discretizer))
val model = pipeline.fit(df)
model.transform(df)