通过 Spark MLlib Regression 估计一个数值

Estimate a numerical value through Spark MLlib Regression

我正在训练 Spark MLlib 线性回归器,但我相信我不理解部分库的实际使用。

我有 1 个特征 (NameItem) 和一个输出 (Accumulator)。 第一个是分类的(速度、温度等),第二个是双精度类型的数值。

训练集由数百万个条目组成,它们不是线性相关的(我检查了热图和相关指数)。

问题:我想通过线性回归估计给定NameItem值的Accumulator值,但我认为这不是我想要的我真的在做。

问题:我该怎么做?

我首先将数据集划分在training setdata set

(trainDF, testDF) = df.randomSplit((0.80, 0.20), seed=42)

之后我尝试了管道方法,正如大多数教程所示:

1) 我索引了 NameItem

indexer = StringIndexer(inputCol="NameItem", outputCol="CategorizedItem", handleInvalid = "keep")

2) 然后我编码了它

encoderInput = [indexer.getOutputCol()]
encoderOutput = ["EncodedItem"]
encoder = OneHotEncoderEstimator(inputCols=encoderInput, outputCols=encoderOutput)

3) 还组装好了

assemblerInput = encoderOutput
assembler = VectorAssembler(inputCols=assemblerInput, outputCol="features")

之后我继续有效的训练:

lr = LinearRegression(labelCol="Accumulator")
pipeline = Pipeline(stages=[indexer, encoder, assembler, lr])
lrModel = pipeline.fit(trainDF)

这就是我在测试集上应用预测时得到的结果:

predictions = lrModel.transform(testDF).show(5, False)
+--------------+-----------------+---------------+-----------------+-------------------------------+------------------+
|NameItem      |Accumulator      |CategorizedItem|EncodedItem      |features                       |prediction        |
+--------------+-----------------+---------------+-----------------+-------------------------------+------------------+
|Speed         |44000.00000000   |265.0          |(688,[265],[1.0])|(689,[265,688],[1.0,44000.0])  |44000.100892495786|
|Speed         |245000.00000000  |265.0          |(688,[265],[1.0])|(689,[265,688],[1.0,245000.0]) |245000.09963708033|
|Temp          |4473860.00000000 |66.0           |(688,[66],[1.0]) |(689,[66,688],[1.0,4473860.0]) |4473859.874261986 |
|Temp          |6065.00000000    |66.0           |(688,[66],[1.0]) |(689,[66,688],[1.0,6065.0])    |6065.097757082314 |
|Temp          |10140.00000000   |66.0           |(688,[66],[1.0]) |(689,[66,688],[1.0,10140.0])   |10140.097731630483|
+--------------+-----------------+---------------+-----------------+-------------------------------+------------------+
only showing top 5 rows

对于相同的分类特征(例如 Temp),我怎么可能得到 3 个不同的预测?

虽然和期望值很接近,但我还是觉得哪里不对。

How can it be possible that for the same categorical feature (for example Temp) I get 3 different predictions?

这是因为你的输出 Accumulator 不知何故进入了 features(当然不应该是这样),所以模型只是 "predicts"(本质上是复制)这个部分输入;这就是为什么预测是 so "accurate"...

似乎 VectorAssembler 搞砸了。问题是,你在这里并不真的需要 VectorAssembler,因为实际上你只有一个 "single" 特征(EncodedItem 中的单热编码稀疏向量)。这 可能 VectorAssembler 在这里表现如此的原因(它被要求 "assemble" 一个单一的功能),但无论如何这将是一个错误。

所以我建议去掉VectorAssembler,直接将EncodedItem重命名为features,即:

indexer = StringIndexer(inputCol="NameItem", outputCol="CategorizedItem", handleInvalid = "keep")

encoderInput = [indexer.getOutputCol()]
encoderOutput = ["features"]  # 1st change
encoder = OneHotEncoderEstimator(inputCols=encoderInput, outputCols=encoderOutput)

lr = LinearRegression(labelCol="Accumulator")
pipeline = Pipeline(stages=[indexer, encoder, lr])  # 2nd change
lrModel = pipeline.fit(trainDF)

更新(评论反馈后)

My Spark version Is 1.4.4

很遗憾,我无法重现该问题,原因很简单,因为我无法访问您正在使用的 Spark 1.4.4。但我已经确认它在最新版本的 Spark 2.4.4 中工作正常,这让我更倾向于相信 v1.4 中确实存在一些错误,但是随后已得到解决。

这是 Spark 2.4.4 中的复制品,使用了一些类似于您的虚拟数据:

spark.version
# '2.4.4'

from pyspark.ml.feature import VectorAssembler, OneHotEncoderEstimator, StringIndexer
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

# dummy data resembling yours:

df = spark.createDataFrame([['Speed', 44000], 
                            ['Temp', 23000], 
                            ['Temp', 5000], 
                            ['Speed', 75000], 
                            ['Weight', 5300], 
                            ['Height', 34500], 
                            ['Weight', 6500]], 
                            ['NameItem', 'Accumulator'])

df.show()
# result:
+--------+-----------+
|NameItem|Accumulator|
+--------+-----------+
|   Speed|      44000|
|    Temp|      23000|
|    Temp|       5000|
|   Speed|      75000|
|  Weight|       5300|
|  Height|      34500|
|  Weight|       6500|
+--------+-----------+

indexer = StringIndexer(inputCol="NameItem", outputCol="CategorizedItem", handleInvalid = "keep")

encoderInput = [indexer.getOutputCol()]
encoderOutput = ["EncodedItem"]
encoder = OneHotEncoderEstimator(inputCols=encoderInput, outputCols=encoderOutput)

assemblerInput = encoderOutput
assembler = VectorAssembler(inputCols=assemblerInput, outputCol="features")

lr = LinearRegression(labelCol="Accumulator")

pipeline = Pipeline(stages=[indexer, encoder, assembler, lr])
lrModel = pipeline.fit(df) 
lrModel.transform(df).show() # predicting on the same df, for simplicity

最后transform的结果是

+--------+-----------+---------------+-------------+-------------+------------------+
|NameItem|Accumulator|CategorizedItem|  EncodedItem|     features|        prediction|
+--------+-----------+---------------+-------------+-------------+------------------+
|   Speed|      44000|            2.0|(4,[2],[1.0])|(4,[2],[1.0])|           59500.0|
|    Temp|      23000|            1.0|(4,[1],[1.0])|(4,[1],[1.0])|14000.000000000004|
|    Temp|       5000|            1.0|(4,[1],[1.0])|(4,[1],[1.0])|14000.000000000004|
|   Speed|      75000|            2.0|(4,[2],[1.0])|(4,[2],[1.0])|           59500.0|
|  Weight|       5300|            0.0|(4,[0],[1.0])|(4,[0],[1.0])| 5900.000000000004|
|  Height|      34500|            3.0|(4,[3],[1.0])|(4,[3],[1.0])|           34500.0|
|  Weight|       6500|            0.0|(4,[0],[1.0])|(4,[0],[1.0])| 5900.000000000004|   
+--------+-----------+---------------+-------------+-------------+------------------+

从哪里可以看到:

  1. features 现在 包括输出变量 Accumulator 的值,因为它确实应该如此;事实上,正如我上面所说,features 现在与 EncodedItem 相同,使得 VectorAssembler 变得多余,正如我们应该期望的那样,因为我们只有一个特征。
  2. prediction 的值现在与 NameItem 的相同值相同,同样符合我们的预期,而且它们不太准确,因此更符合实际。

因此,您的问题肯定与您使用的非常过时 Spark 1.4.4 版有关。 Spark自v1.4以来有了飞跃,你应该认真考虑更新...