通过 Spark MLlib Regression 估计一个数值
Estimate a numerical value through Spark MLlib Regression
我正在训练 Spark MLlib 线性回归器,但我相信我不理解部分库的实际使用。
我有 1 个特征 (NameItem
) 和一个输出 (Accumulator
)。
第一个是分类的(速度、温度等),第二个是双精度类型的数值。
训练集由数百万个条目组成,它们不是线性相关的(我检查了热图和相关指数)。
问题:我想通过线性回归估计给定NameItem
值的Accumulator
值,但我认为这不是我想要的我真的在做。
问题:我该怎么做?
我首先将数据集划分在training set
和data set
:
(trainDF, testDF) = df.randomSplit((0.80, 0.20), seed=42)
之后我尝试了管道方法,正如大多数教程所示:
1) 我索引了 NameItem
indexer = StringIndexer(inputCol="NameItem", outputCol="CategorizedItem", handleInvalid = "keep")
2) 然后我编码了它
encoderInput = [indexer.getOutputCol()]
encoderOutput = ["EncodedItem"]
encoder = OneHotEncoderEstimator(inputCols=encoderInput, outputCols=encoderOutput)
3) 还组装好了
assemblerInput = encoderOutput
assembler = VectorAssembler(inputCols=assemblerInput, outputCol="features")
之后我继续有效的训练:
lr = LinearRegression(labelCol="Accumulator")
pipeline = Pipeline(stages=[indexer, encoder, assembler, lr])
lrModel = pipeline.fit(trainDF)
这就是我在测试集上应用预测时得到的结果:
predictions = lrModel.transform(testDF).show(5, False)
+--------------+-----------------+---------------+-----------------+-------------------------------+------------------+
|NameItem |Accumulator |CategorizedItem|EncodedItem |features |prediction |
+--------------+-----------------+---------------+-----------------+-------------------------------+------------------+
|Speed |44000.00000000 |265.0 |(688,[265],[1.0])|(689,[265,688],[1.0,44000.0]) |44000.100892495786|
|Speed |245000.00000000 |265.0 |(688,[265],[1.0])|(689,[265,688],[1.0,245000.0]) |245000.09963708033|
|Temp |4473860.00000000 |66.0 |(688,[66],[1.0]) |(689,[66,688],[1.0,4473860.0]) |4473859.874261986 |
|Temp |6065.00000000 |66.0 |(688,[66],[1.0]) |(689,[66,688],[1.0,6065.0]) |6065.097757082314 |
|Temp |10140.00000000 |66.0 |(688,[66],[1.0]) |(689,[66,688],[1.0,10140.0]) |10140.097731630483|
+--------------+-----------------+---------------+-----------------+-------------------------------+------------------+
only showing top 5 rows
对于相同的分类特征(例如 Temp
),我怎么可能得到 3 个不同的预测?
虽然和期望值很接近,但我还是觉得哪里不对。
How can it be possible that for the same categorical feature (for example Temp
) I get 3 different predictions?
这是因为你的输出 Accumulator
不知何故进入了 features
(当然不应该是这样),所以模型只是 "predicts"(本质上是复制)这个部分输入;这就是为什么预测是 so "accurate"...
似乎 VectorAssembler
搞砸了。问题是,你在这里并不真的需要 VectorAssembler
,因为实际上你只有一个 "single" 特征(EncodedItem
中的单热编码稀疏向量)。这 可能 是 VectorAssembler
在这里表现如此的原因(它被要求 "assemble" 一个单一的功能),但无论如何这将是一个错误。
所以我建议去掉VectorAssembler
,直接将EncodedItem
重命名为features
,即:
indexer = StringIndexer(inputCol="NameItem", outputCol="CategorizedItem", handleInvalid = "keep")
encoderInput = [indexer.getOutputCol()]
encoderOutput = ["features"] # 1st change
encoder = OneHotEncoderEstimator(inputCols=encoderInput, outputCols=encoderOutput)
lr = LinearRegression(labelCol="Accumulator")
pipeline = Pipeline(stages=[indexer, encoder, lr]) # 2nd change
lrModel = pipeline.fit(trainDF)
更新(评论反馈后)
My Spark version Is 1.4.4
很遗憾,我无法重现该问题,原因很简单,因为我无法访问您正在使用的 Spark 1.4.4。但我已经确认它在最新版本的 Spark 2.4.4 中工作正常,这让我更倾向于相信 v1.4 中确实存在一些错误,但是随后已得到解决。
这是 Spark 2.4.4 中的复制品,使用了一些类似于您的虚拟数据:
spark.version
# '2.4.4'
from pyspark.ml.feature import VectorAssembler, OneHotEncoderEstimator, StringIndexer
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
# dummy data resembling yours:
df = spark.createDataFrame([['Speed', 44000],
['Temp', 23000],
['Temp', 5000],
['Speed', 75000],
['Weight', 5300],
['Height', 34500],
['Weight', 6500]],
['NameItem', 'Accumulator'])
df.show()
# result:
+--------+-----------+
|NameItem|Accumulator|
+--------+-----------+
| Speed| 44000|
| Temp| 23000|
| Temp| 5000|
| Speed| 75000|
| Weight| 5300|
| Height| 34500|
| Weight| 6500|
+--------+-----------+
indexer = StringIndexer(inputCol="NameItem", outputCol="CategorizedItem", handleInvalid = "keep")
encoderInput = [indexer.getOutputCol()]
encoderOutput = ["EncodedItem"]
encoder = OneHotEncoderEstimator(inputCols=encoderInput, outputCols=encoderOutput)
assemblerInput = encoderOutput
assembler = VectorAssembler(inputCols=assemblerInput, outputCol="features")
lr = LinearRegression(labelCol="Accumulator")
pipeline = Pipeline(stages=[indexer, encoder, assembler, lr])
lrModel = pipeline.fit(df)
lrModel.transform(df).show() # predicting on the same df, for simplicity
最后transform
的结果是
+--------+-----------+---------------+-------------+-------------+------------------+
|NameItem|Accumulator|CategorizedItem| EncodedItem| features| prediction|
+--------+-----------+---------------+-------------+-------------+------------------+
| Speed| 44000| 2.0|(4,[2],[1.0])|(4,[2],[1.0])| 59500.0|
| Temp| 23000| 1.0|(4,[1],[1.0])|(4,[1],[1.0])|14000.000000000004|
| Temp| 5000| 1.0|(4,[1],[1.0])|(4,[1],[1.0])|14000.000000000004|
| Speed| 75000| 2.0|(4,[2],[1.0])|(4,[2],[1.0])| 59500.0|
| Weight| 5300| 0.0|(4,[0],[1.0])|(4,[0],[1.0])| 5900.000000000004|
| Height| 34500| 3.0|(4,[3],[1.0])|(4,[3],[1.0])| 34500.0|
| Weight| 6500| 0.0|(4,[0],[1.0])|(4,[0],[1.0])| 5900.000000000004|
+--------+-----------+---------------+-------------+-------------+------------------+
从哪里可以看到:
features
现在 不 包括输出变量 Accumulator
的值,因为它确实应该如此;事实上,正如我上面所说,features
现在与 EncodedItem
相同,使得 VectorAssembler
变得多余,正如我们应该期望的那样,因为我们只有一个特征。
prediction
的值现在与 NameItem
的相同值相同,同样符合我们的预期,而且它们不太准确,因此更符合实际。
因此,您的问题肯定与您使用的非常过时 Spark 1.4.4 版有关。 Spark自v1.4以来有了飞跃,你应该认真考虑更新...
我正在训练 Spark MLlib 线性回归器,但我相信我不理解部分库的实际使用。
我有 1 个特征 (NameItem
) 和一个输出 (Accumulator
)。
第一个是分类的(速度、温度等),第二个是双精度类型的数值。
训练集由数百万个条目组成,它们不是线性相关的(我检查了热图和相关指数)。
问题:我想通过线性回归估计给定NameItem
值的Accumulator
值,但我认为这不是我想要的我真的在做。
问题:我该怎么做?
我首先将数据集划分在training set
和data set
:
(trainDF, testDF) = df.randomSplit((0.80, 0.20), seed=42)
之后我尝试了管道方法,正如大多数教程所示:
1) 我索引了 NameItem
indexer = StringIndexer(inputCol="NameItem", outputCol="CategorizedItem", handleInvalid = "keep")
2) 然后我编码了它
encoderInput = [indexer.getOutputCol()]
encoderOutput = ["EncodedItem"]
encoder = OneHotEncoderEstimator(inputCols=encoderInput, outputCols=encoderOutput)
3) 还组装好了
assemblerInput = encoderOutput
assembler = VectorAssembler(inputCols=assemblerInput, outputCol="features")
之后我继续有效的训练:
lr = LinearRegression(labelCol="Accumulator")
pipeline = Pipeline(stages=[indexer, encoder, assembler, lr])
lrModel = pipeline.fit(trainDF)
这就是我在测试集上应用预测时得到的结果:
predictions = lrModel.transform(testDF).show(5, False)
+--------------+-----------------+---------------+-----------------+-------------------------------+------------------+
|NameItem |Accumulator |CategorizedItem|EncodedItem |features |prediction |
+--------------+-----------------+---------------+-----------------+-------------------------------+------------------+
|Speed |44000.00000000 |265.0 |(688,[265],[1.0])|(689,[265,688],[1.0,44000.0]) |44000.100892495786|
|Speed |245000.00000000 |265.0 |(688,[265],[1.0])|(689,[265,688],[1.0,245000.0]) |245000.09963708033|
|Temp |4473860.00000000 |66.0 |(688,[66],[1.0]) |(689,[66,688],[1.0,4473860.0]) |4473859.874261986 |
|Temp |6065.00000000 |66.0 |(688,[66],[1.0]) |(689,[66,688],[1.0,6065.0]) |6065.097757082314 |
|Temp |10140.00000000 |66.0 |(688,[66],[1.0]) |(689,[66,688],[1.0,10140.0]) |10140.097731630483|
+--------------+-----------------+---------------+-----------------+-------------------------------+------------------+
only showing top 5 rows
对于相同的分类特征(例如 Temp
),我怎么可能得到 3 个不同的预测?
虽然和期望值很接近,但我还是觉得哪里不对。
How can it be possible that for the same categorical feature (for example
Temp
) I get 3 different predictions?
这是因为你的输出 Accumulator
不知何故进入了 features
(当然不应该是这样),所以模型只是 "predicts"(本质上是复制)这个部分输入;这就是为什么预测是 so "accurate"...
似乎 VectorAssembler
搞砸了。问题是,你在这里并不真的需要 VectorAssembler
,因为实际上你只有一个 "single" 特征(EncodedItem
中的单热编码稀疏向量)。这 可能 是 VectorAssembler
在这里表现如此的原因(它被要求 "assemble" 一个单一的功能),但无论如何这将是一个错误。
所以我建议去掉VectorAssembler
,直接将EncodedItem
重命名为features
,即:
indexer = StringIndexer(inputCol="NameItem", outputCol="CategorizedItem", handleInvalid = "keep")
encoderInput = [indexer.getOutputCol()]
encoderOutput = ["features"] # 1st change
encoder = OneHotEncoderEstimator(inputCols=encoderInput, outputCols=encoderOutput)
lr = LinearRegression(labelCol="Accumulator")
pipeline = Pipeline(stages=[indexer, encoder, lr]) # 2nd change
lrModel = pipeline.fit(trainDF)
更新(评论反馈后)
My Spark version Is 1.4.4
很遗憾,我无法重现该问题,原因很简单,因为我无法访问您正在使用的 Spark 1.4.4。但我已经确认它在最新版本的 Spark 2.4.4 中工作正常,这让我更倾向于相信 v1.4 中确实存在一些错误,但是随后已得到解决。
这是 Spark 2.4.4 中的复制品,使用了一些类似于您的虚拟数据:
spark.version
# '2.4.4'
from pyspark.ml.feature import VectorAssembler, OneHotEncoderEstimator, StringIndexer
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
# dummy data resembling yours:
df = spark.createDataFrame([['Speed', 44000],
['Temp', 23000],
['Temp', 5000],
['Speed', 75000],
['Weight', 5300],
['Height', 34500],
['Weight', 6500]],
['NameItem', 'Accumulator'])
df.show()
# result:
+--------+-----------+
|NameItem|Accumulator|
+--------+-----------+
| Speed| 44000|
| Temp| 23000|
| Temp| 5000|
| Speed| 75000|
| Weight| 5300|
| Height| 34500|
| Weight| 6500|
+--------+-----------+
indexer = StringIndexer(inputCol="NameItem", outputCol="CategorizedItem", handleInvalid = "keep")
encoderInput = [indexer.getOutputCol()]
encoderOutput = ["EncodedItem"]
encoder = OneHotEncoderEstimator(inputCols=encoderInput, outputCols=encoderOutput)
assemblerInput = encoderOutput
assembler = VectorAssembler(inputCols=assemblerInput, outputCol="features")
lr = LinearRegression(labelCol="Accumulator")
pipeline = Pipeline(stages=[indexer, encoder, assembler, lr])
lrModel = pipeline.fit(df)
lrModel.transform(df).show() # predicting on the same df, for simplicity
最后transform
的结果是
+--------+-----------+---------------+-------------+-------------+------------------+
|NameItem|Accumulator|CategorizedItem| EncodedItem| features| prediction|
+--------+-----------+---------------+-------------+-------------+------------------+
| Speed| 44000| 2.0|(4,[2],[1.0])|(4,[2],[1.0])| 59500.0|
| Temp| 23000| 1.0|(4,[1],[1.0])|(4,[1],[1.0])|14000.000000000004|
| Temp| 5000| 1.0|(4,[1],[1.0])|(4,[1],[1.0])|14000.000000000004|
| Speed| 75000| 2.0|(4,[2],[1.0])|(4,[2],[1.0])| 59500.0|
| Weight| 5300| 0.0|(4,[0],[1.0])|(4,[0],[1.0])| 5900.000000000004|
| Height| 34500| 3.0|(4,[3],[1.0])|(4,[3],[1.0])| 34500.0|
| Weight| 6500| 0.0|(4,[0],[1.0])|(4,[0],[1.0])| 5900.000000000004|
+--------+-----------+---------------+-------------+-------------+------------------+
从哪里可以看到:
features
现在 不 包括输出变量Accumulator
的值,因为它确实应该如此;事实上,正如我上面所说,features
现在与EncodedItem
相同,使得VectorAssembler
变得多余,正如我们应该期望的那样,因为我们只有一个特征。prediction
的值现在与NameItem
的相同值相同,同样符合我们的预期,而且它们不太准确,因此更符合实际。
因此,您的问题肯定与您使用的非常过时 Spark 1.4.4 版有关。 Spark自v1.4以来有了飞跃,你应该认真考虑更新...