如何在多列上使用 spark quantilediscretizer
How to use spark quantilediscretizer on multiple columns
全部,
我有如下的 ml 管道设置
import org.apache.spark.ml.feature.QuantileDiscretizer
import org.apache.spark.sql.types.{StructType,StructField,DoubleType}
import org.apache.spark.ml.Pipeline
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import scala.util.Random
val nRows = 10000
val nCols = 1000
val data = sc.parallelize(0 to nRows-1).map { _ => Row.fromSeq(Seq.fill(nCols)(Random.nextDouble)) }
val schema = StructType((0 to nCols-1).map { i => StructField("C" + i, DoubleType, true) } )
val df = spark.createDataFrame(data, schema)
df.cache()
//Get continuous feature name and discretize them
val continuous = df.dtypes.filter(_._2 == "DoubleType").map (_._1)
val discretizers = continuous.map(c => new QuantileDiscretizer().setInputCol(c).setOutputCol(s"${c}_disc").setNumBuckets(3).fit(df))
val pipeline = new Pipeline().setStages(discretizers)
val model = pipeline.fit(df)
当我 运行 这样做时,spark 似乎将每个离散化器设置为单独的工作。有没有办法 运行 所有的离散化器作为一个单一的工作有或没有管道?
感谢帮助,感激不尽
import org.apache.spark.ml.feature.QuantileDiscretizer
val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
val df = spark.createDataFrame(data).toDF("id", "hour")
val discretizer = new QuantileDiscretizer()
.setInputCol("hour")
.setOutputCol("result")
.setNumBuckets(3)
val result = discretizer.fit(df).transform(df)
result.show()
它作为单个作业针对单个列运行,下面它也作为单个作业运行但针对多个列:
def discretizerFun (col: String, bucketNo: Int):
org.apache.spark.ml.feature.QuantileDiscretizer = {
val discretizer = new QuantileDiscretizer()
discretizer
.setInputCol(col)
.setOutputCol(s"${col}_result")
.setNumBuckets(bucketNo)
}
val data = Array((0, 18.0, 2.1), (1, 19.0, 14.1), (2, 8.0, 63.7), (3, 5.0,
88.3), (4, 2.2, 0.8))
val df = spark.createDataFrame(data).toDF("id", "hour", "temp")
val res = discretizerFun("temp", 4).fit(discretizerFun("hour", 2).fit(df).transform(df)).transform(discretizerFun("hour", 2).fit(df).transform(df))
最好的方法是将该函数转换为 udf
,但这可能是处理 org.apache.spark.ml.feature.QuantileDiscretizer
- type
的问题,如果可以的话,那么你会很高兴进行惰性转换的简洁方法
Spark 2.3.0 中添加了对此功能的支持。 See release docs
- 多个特征转换器的多列支持:
- [SPARK-13030]:OneHotEncoderEstimator (Scala/Java/Python)
- [SPARK-22397]:QuantileDiscretizer (Scala/Java)
- [SPARK-20542]:分桶器(Scala/Java/Python)
您现在可以使用setInputCols
和setOutputCols
来指定多个列,尽管它似乎还没有反映在官方文档中。与一次处理每一列一个作业相比,这个新补丁的性能得到了极大的提高。
您的示例可以修改如下:
import org.apache.spark.ml.feature.QuantileDiscretizer
import org.apache.spark.sql.types.{StructType,StructField,DoubleType}
import org.apache.spark.ml.Pipeline
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import scala.util.Random
val nRows = 10000
val nCols = 1000
val data = sc.parallelize(0 to nRows-1).map { _ => Row.fromSeq(Seq.fill(nCols)(Random.nextDouble)) }
val schema = StructType((0 to nCols-1).map { i => StructField("C" + i, DoubleType, true) } )
val df = spark.createDataFrame(data, schema)
df.cache()
//Get continuous feature name and discretize them
val continuous = df.dtypes.filter(_._2 == "DoubleType").map (_._1)
val discretizer = new QuantileDiscretizer()
.setInputCols(continuous)
.setOutputCols(continuous.map(c => s"${c}_disc"))
.setNumBuckets(3)
val pipeline = new Pipeline().setStages(Array(discretizer))
val model = pipeline.fit(df)
model.transform(df)
全部,
我有如下的 ml 管道设置
import org.apache.spark.ml.feature.QuantileDiscretizer
import org.apache.spark.sql.types.{StructType,StructField,DoubleType}
import org.apache.spark.ml.Pipeline
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import scala.util.Random
val nRows = 10000
val nCols = 1000
val data = sc.parallelize(0 to nRows-1).map { _ => Row.fromSeq(Seq.fill(nCols)(Random.nextDouble)) }
val schema = StructType((0 to nCols-1).map { i => StructField("C" + i, DoubleType, true) } )
val df = spark.createDataFrame(data, schema)
df.cache()
//Get continuous feature name and discretize them
val continuous = df.dtypes.filter(_._2 == "DoubleType").map (_._1)
val discretizers = continuous.map(c => new QuantileDiscretizer().setInputCol(c).setOutputCol(s"${c}_disc").setNumBuckets(3).fit(df))
val pipeline = new Pipeline().setStages(discretizers)
val model = pipeline.fit(df)
当我 运行 这样做时,spark 似乎将每个离散化器设置为单独的工作。有没有办法 运行 所有的离散化器作为一个单一的工作有或没有管道? 感谢帮助,感激不尽
import org.apache.spark.ml.feature.QuantileDiscretizer
val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
val df = spark.createDataFrame(data).toDF("id", "hour")
val discretizer = new QuantileDiscretizer()
.setInputCol("hour")
.setOutputCol("result")
.setNumBuckets(3)
val result = discretizer.fit(df).transform(df)
result.show()
它作为单个作业针对单个列运行,下面它也作为单个作业运行但针对多个列:
def discretizerFun (col: String, bucketNo: Int):
org.apache.spark.ml.feature.QuantileDiscretizer = {
val discretizer = new QuantileDiscretizer()
discretizer
.setInputCol(col)
.setOutputCol(s"${col}_result")
.setNumBuckets(bucketNo)
}
val data = Array((0, 18.0, 2.1), (1, 19.0, 14.1), (2, 8.0, 63.7), (3, 5.0,
88.3), (4, 2.2, 0.8))
val df = spark.createDataFrame(data).toDF("id", "hour", "temp")
val res = discretizerFun("temp", 4).fit(discretizerFun("hour", 2).fit(df).transform(df)).transform(discretizerFun("hour", 2).fit(df).transform(df))
最好的方法是将该函数转换为 udf
,但这可能是处理 org.apache.spark.ml.feature.QuantileDiscretizer
- type
的问题,如果可以的话,那么你会很高兴进行惰性转换的简洁方法
Spark 2.3.0 中添加了对此功能的支持。 See release docs
- 多个特征转换器的多列支持:
- [SPARK-13030]:OneHotEncoderEstimator (Scala/Java/Python)
- [SPARK-22397]:QuantileDiscretizer (Scala/Java)
- [SPARK-20542]:分桶器(Scala/Java/Python)
您现在可以使用setInputCols
和setOutputCols
来指定多个列,尽管它似乎还没有反映在官方文档中。与一次处理每一列一个作业相比,这个新补丁的性能得到了极大的提高。
您的示例可以修改如下:
import org.apache.spark.ml.feature.QuantileDiscretizer
import org.apache.spark.sql.types.{StructType,StructField,DoubleType}
import org.apache.spark.ml.Pipeline
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import scala.util.Random
val nRows = 10000
val nCols = 1000
val data = sc.parallelize(0 to nRows-1).map { _ => Row.fromSeq(Seq.fill(nCols)(Random.nextDouble)) }
val schema = StructType((0 to nCols-1).map { i => StructField("C" + i, DoubleType, true) } )
val df = spark.createDataFrame(data, schema)
df.cache()
//Get continuous feature name and discretize them
val continuous = df.dtypes.filter(_._2 == "DoubleType").map (_._1)
val discretizer = new QuantileDiscretizer()
.setInputCols(continuous)
.setOutputCols(continuous.map(c => s"${c}_disc"))
.setNumBuckets(3)
val pipeline = new Pipeline().setStages(Array(discretizer))
val model = pipeline.fit(df)
model.transform(df)