使用 StandardScaler 时的 SparseVector 与 DenseVector
SparseVector vs DenseVector when using StandardScaler
我正在使用以下代码规范化 PySpark DataFrame
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml import Pipeline
cols = ["a", "b", "c"]
df = spark.createDataFrame([(1, 0, 3), (2, 3, 2), (1, 3, 1), (3, 0, 3)], cols)
Pipeline(stages=[
VectorAssembler(inputCols=cols, outputCol='features'),
StandardScaler(withMean=True, inputCol='features', outputCol='scaledFeatures')
]).fit(df).transform(df).select(cols + ['scaledFeatures']).head()
这给出了预期的结果:
Row(a=1, b=0, c=3, scaledFeatures=DenseVector([-0.7833, -0.866, 0.7833]))
然而,当我 运行 从镶木地板文件加载的(大得多的)数据集上的管道时,我收到以下异常:
16/12/21 09:47:50 WARN TaskSetManager: Lost task 0.0 in stage 60.0 (TID 6370, 10.231.153.67): org.apache.spark.SparkException: Failed to execute user defined function($anonfu
n: (vector) => vector)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply2_2$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:121)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:112)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run.apply(PythonRDD.scala:328)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1877)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
Caused by: java.lang.IllegalArgumentException: Do not support vector type class org.apache.spark.mllib.linalg.SparseVector
at org.apache.spark.mllib.feature.StandardScalerModel.transform(StandardScaler.scala:160)
at org.apache.spark.ml.feature.StandardScalerModel$$anonfun.apply(StandardScaler.scala:167)
at org.apache.spark.ml.feature.StandardScalerModel$$anonfun.apply(StandardScaler.scala:167)
... 13 more
我注意到此处 VectorAssembler 已将我的列转换为 mllib.linalg.SparseVector 而不是第一种情况中使用的 DenseVector。
有什么解决方法吗?
我注意到您想将其创建到自定义转换中以将其直接包含在您的管道中。
这应该可以为您做到这一点。
from pyspark import keyword_only
from pyspark.ml.pipeline import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol
from pyspark.ml.linalg import SparseVector, DenseVector, VectorUDT
from pyspark.sql.functions import udf
class AsDenseTransformer(Transformer, HasInputCol, HasOutputCol):
@keyword_only
def __init__(self, inputCol=None, outputCol=None):
super(AsDenseTransformer, self).__init__()
kwargs = self.__init__._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, inputCol=None, outputCol=None):
kwargs = self.setParams._input_kwargs
return self._set(**kwargs)
def _transform(self, dataset):
out_col = self.getOutputCol()
in_col = dataset[self.getInputCol()]
asDense = udf(lambda s: DenseVector(s.toArray()), VectorUDT())
return dataset.withColumn(out_col, asDense(in_col))
一旦你定义了它,你就可以将它初始化为一个转换,以便在向量汇编器之后包含在你的管道中。
Pipeline(stages=[
VectorAssembler(inputCols=cols, outputCol='features'),
AsDenseTransformer(inputCol='features', outputCol='features'),
StandardScaler(withMean=True, inputCol='features', outputCol='scaledFeatures')
]).fit(df).transform(df).select(cols + ['scaledFeatures']).head()
我正在使用以下代码规范化 PySpark DataFrame
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml import Pipeline
cols = ["a", "b", "c"]
df = spark.createDataFrame([(1, 0, 3), (2, 3, 2), (1, 3, 1), (3, 0, 3)], cols)
Pipeline(stages=[
VectorAssembler(inputCols=cols, outputCol='features'),
StandardScaler(withMean=True, inputCol='features', outputCol='scaledFeatures')
]).fit(df).transform(df).select(cols + ['scaledFeatures']).head()
这给出了预期的结果:
Row(a=1, b=0, c=3, scaledFeatures=DenseVector([-0.7833, -0.866, 0.7833]))
然而,当我 运行 从镶木地板文件加载的(大得多的)数据集上的管道时,我收到以下异常:
16/12/21 09:47:50 WARN TaskSetManager: Lost task 0.0 in stage 60.0 (TID 6370, 10.231.153.67): org.apache.spark.SparkException: Failed to execute user defined function($anonfu
n: (vector) => vector)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply2_2$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:121)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:112)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run.apply(PythonRDD.scala:328)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1877)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
Caused by: java.lang.IllegalArgumentException: Do not support vector type class org.apache.spark.mllib.linalg.SparseVector
at org.apache.spark.mllib.feature.StandardScalerModel.transform(StandardScaler.scala:160)
at org.apache.spark.ml.feature.StandardScalerModel$$anonfun.apply(StandardScaler.scala:167)
at org.apache.spark.ml.feature.StandardScalerModel$$anonfun.apply(StandardScaler.scala:167)
... 13 more
我注意到此处 VectorAssembler 已将我的列转换为 mllib.linalg.SparseVector 而不是第一种情况中使用的 DenseVector。
有什么解决方法吗?
我注意到您想将其创建到自定义转换中以将其直接包含在您的管道中。
这应该可以为您做到这一点。
from pyspark import keyword_only
from pyspark.ml.pipeline import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol
from pyspark.ml.linalg import SparseVector, DenseVector, VectorUDT
from pyspark.sql.functions import udf
class AsDenseTransformer(Transformer, HasInputCol, HasOutputCol):
@keyword_only
def __init__(self, inputCol=None, outputCol=None):
super(AsDenseTransformer, self).__init__()
kwargs = self.__init__._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, inputCol=None, outputCol=None):
kwargs = self.setParams._input_kwargs
return self._set(**kwargs)
def _transform(self, dataset):
out_col = self.getOutputCol()
in_col = dataset[self.getInputCol()]
asDense = udf(lambda s: DenseVector(s.toArray()), VectorUDT())
return dataset.withColumn(out_col, asDense(in_col))
一旦你定义了它,你就可以将它初始化为一个转换,以便在向量汇编器之后包含在你的管道中。
Pipeline(stages=[
VectorAssembler(inputCols=cols, outputCol='features'),
AsDenseTransformer(inputCol='features', outputCol='features'),
StandardScaler(withMean=True, inputCol='features', outputCol='scaledFeatures')
]).fit(df).transform(df).select(cols + ['scaledFeatures']).head()