使用 PySpark 进行多类分类的逻辑回归问题

Issues with Logistic Regression for multiclass classification using PySpark

我正在尝试使用 Logistic Regression 来 class 化特征向量中具有 稀疏向量 的数据集:

完整的代码库和错误日志,请查看我的github repo

案例 1:我尝试使用 ML 的管道如下:

# imported library from ML
from pyspark.ml.feature import HashingTF
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

print(type(trainingData)) # for checking only
print(trainingData.take(2)) # for of data type
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=maximumIteration,     regParam=re
gParamValue)
pipeline = Pipeline(stages=[lr])
# Train model
model = pipeline.fit(trainingData)

出现以下错误:

<class 'pyspark.sql.dataframe.DataFrame'>
[Row(label=2.0, features=SparseVector(2000, {51: 1.0, 160: 1.0, 341: 1.0, 417: 1.0, 561: 1.0, 656: 1.0, 863: 1.0, 939: 1.0, 1021: 1.0, 1324: 1.0, 1433: 1.0, 1573: 1.0, 1604: 1.0, 1720: 1.0})), Row(label=3.0, features=SparseVector(2000, {24: 1.0, 51: 2.0, 119: 1.0, 167: 1.0, 182: 1.0, 190: 1.0, 195: 1.0, 285: 1.0, 432: 1.0, 539: 1.0, 571: 1.0, 630: 1.0, 638: 1.0, 656: 1.0, 660: 2.0, 751: 1.0, 785: 1.0, 794: 1.0, 801: 1.0, 823: 1.0, 893: 1.0, 900: 1.0, 915: 1.0, 956: 1.0, 966: 1.0, 1025: 1.0, 1029: 1.0, 1035: 1.0, 1038: 1.0, 1093: 1.0, 1115: 2.0, 1147: 1.0, 1206: 1.0, 1252: 1.0, 1261: 1.0, 1262: 1.0, 1268: 1.0, 1304: 1.0, 1351: 1.0, 1378: 1.0, 1423: 1.0, 1437: 1.0, 1441: 1.0, 1530: 1.0, 1534: 1.0, 1556: 1.0, 1562: 1.0, 1604: 1.0, 1711: 1.0, 1737: 1.0, 1750: 1.0, 1776: 1.0, 1858: 1.0, 1865: 1.0, 1923: 1.0, 1926: 1.0, 1959: 1.0, 1999: 1.0}))]
16/08/25 19:14:07 ERROR org.apache.spark.ml.classification.LogisticRegression: Currently, LogisticRegression with E
lasticNet in ML package only supports binary classification. Found 5 in the input dataset.
Traceback (most recent call last):
  File "/home/LR/test.py", line 260, in <module>
    accuracy = TrainLRCModel(trainData, testData)
  File "/home/LR/test.py", line 211, in TrainLRCModel
    model = pipeline.fit(trainingData)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/pipeline.py", line 69, in fit
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/pipeline.py", line 213, in _fit
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/pipeline.py", line 69, in fit
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/wrapper.py", line 133, in _fit
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/wrapper.py", line 130, in _fit_java
  File "/usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 45, in deco
  File "/usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o207.fit.
: org.apache.spark.SparkException: Currently, LogisticRegression with ElasticNet in ML package only supports binary
 classification. Found 5 in the input dataset.
        at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:290)
        at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:159)
        at org.apache.spark.ml.Predictor.fit(Predictor.scala:90)
        at org.apache.spark.ml.Predictor.fit(Predictor.scala:71)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:209)
        at java.lang.Thread.run(Thread.java:745)

案例 2:我搜索了上述可能的替代解决方案,发现 LogisticRegressionWithLBFGS 将适用于 multi-class class ificaton,我试过如下:

#imported library
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel, LogisticRegressionWithSGD
print(type(trainingData)) # to check the dataset type
print(trainingData.take(2)) # To see the data
model = LogisticRegressionWithLBFGS.train(trainingData, numClasses=5)
print(type(model))

出现以下错误:

<class 'pyspark.sql.dataframe.DataFrame'>
[Row(label=3.0, features=SparseVector(2000, {24: 1.0, 51: 2.0, 119: 1.0, 167: 1.0, 182: 1.0, 190: 1.0, 195: 1.0, 28
5: 1.0, 432: 1.0, 539: 1.0, 571: 1.0, 630: 1.0, 638: 1.0, 656: 1.0, 660: 2.0, 751: 1.0, 785: 1.0, 794: 1.0, 801: 1.
0, 823: 1.0, 893: 1.0, 900: 1.0, 915: 1.0, 956: 1.0, 966: 1.0, 1025: 1.0, 1029: 1.0, 1035: 1.0, 1038: 1.0, 1093: 1.
0, 1115: 2.0, 1147: 1.0, 1206: 1.0, 1252: 1.0, 1261: 1.0, 1262: 1.0, 1268: 1.0, 1304: 1.0, 1351: 1.0, 1378: 1.0, 14
23: 1.0, 1437: 1.0, 1441: 1.0, 1530: 1.0, 1534: 1.0, 1556: 1.0, 1562: 1.0, 1604: 1.0, 1711: 1.0, 1737: 1.0, 1750: 1
.0, 1776: 1.0, 1858: 1.0, 1865: 1.0, 1923: 1.0, 1926: 1.0, 1959: 1.0, 1999: 1.0})), Row(label=5.0, features=SparseV
ector(2000, {103: 1.0, 310: 1.0, 601: 1.0, 817: 1.0, 866: 1.0, 940: 1.0, 1023: 1.0, 1118: 1.0, 1339: 1.0, 1447: 1.0
, 1634: 1.0, 1776: 1.0}))]
Traceback (most recent call last):
  File "/home/LR/test.py", line 260, in <module>
    accuracy = TrainLRCModel(trainData, testData)
  File "/home/LR/test.py", line 230, in TrainLRCModel
    model = LogisticRegressionWithLBFGS.train(trainingData, numClasses=5)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/mllib/classification.py", line 382, in train
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/mllib/regression.py", line 206, in _regression_train_wrapper
TypeError: data should be an RDD of LabeledPoint, but got <class 'pyspark.sql.types.Row'>

我再次尝试将数据集转换为标记点的 RDD,如下所示,即案例 3:

案例 3:将数据集转换为 Labeled Point 的 RDD,以便我可以使用 LogisticRegressionWithLBFGS 如下:

    #imported libraries
    from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel, LogisticRegressionWithSGD
    from pyspark.mllib.regression import LabeledPoint

    print(type(trainingData))
    print(trainingData.take(2))
    trainingData = trainingData.map(lambda row:[LabeledPoint(row.label,row.features)])
    print('type of trainingData')
    print(type(trainingData))
    print(trainingData.take(2))
    model = LogisticRegressionWithLBFGS.train(trainingData, numClasses=5)
    print(type(model))

出现以下错误:

<class 'pyspark.sql.dataframe.DataFrame'>
[Row(label=2.0, features=SparseVector(2000, {51: 1.0, 160: 1.0, 341: 1.0, 417: 1.0, 561: 1.0, 656: 1.0, 863: 1.0, 9
39: 1.0, 1021: 1.0, 1324: 1.0, 1433: 1.0, 1573: 1.0, 1604: 1.0, 1720: 1.0})), Row(label=3.0, features=SparseVector(
2000, {24: 1.0, 51: 2.0, 119: 1.0, 167: 1.0, 182: 1.0, 190: 1.0, 195: 1.0, 285: 1.0, 432: 1.0, 539: 1.0, 571: 1.0, 
630: 1.0, 638: 1.0, 656: 1.0, 660: 2.0, 751: 1.0, 785: 1.0, 794: 1.0, 801: 1.0, 823: 1.0, 893: 1.0, 900: 1.0, 915: 
1.0, 956: 1.0, 966: 1.0, 1025: 1.0, 1029: 1.0, 1035: 1.0, 1038: 1.0, 1093: 1.0, 1115: 2.0, 1147: 1.0, 1206: 1.0, 12
52: 1.0, 1261: 1.0, 1262: 1.0, 1268: 1.0, 1304: 1.0, 1351: 1.0, 1378: 1.0, 1423: 1.0, 1437: 1.0, 1441: 1.0, 1530: 1
.0, 1534: 1.0, 1556: 1.0, 1562: 1.0, 1604: 1.0, 1711: 1.0, 1737: 1.0, 1750: 1.0, 1776: 1.0, 1858: 1.0, 1865: 1.0, 1
923: 1.0, 1926: 1.0, 1959: 1.0, 1999: 1.0}))]
type of trainingData
<class 'pyspark.rdd.PipelinedRDD'>
[[LabeledPoint(2.0, (2000,[51,160,341,417,561,656,863,939,1021,1324,1433,1573,1604,1720],[1.0,1.0,1.0,1.0,1.0,1.0,1
.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))], [LabeledPoint(3.0, (2000,[24,51,119,167,182,190,195,285,432,539,571,630,638,656
,660,751,785,794,801,823,893,900,915,956,966,1025,1029,1035,1038,1093,1115,1147,1206,1252,1261,1262,1268,1304,1351,
1378,1423,1437,1441,1530,1534,1556,1562,1604,1711,1737,1750,1776,1858,1865,1923,1926,1959,1999],[1.0,2.0,1.0,1.0,1.
0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1
.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))]]
Traceback (most recent call last):
  File "/home/LR/test.py", line 260, in <module>
    accuracy = TrainLRCModel(trainData, testData)
  File "/home/LR/test.py", line 230, in TrainLRCModel
    model = LogisticRegressionWithLBFGS.train(trainingData, numClasses=5)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/mllib/classification.py", line 381, in train
AttributeError: 'list' object has no attribute 'features'

有人可以建议我遗漏了什么吗,我想在 PySpark 中使用逻辑回归并 class 验证多 class class 化。

目前我在 google 云上使用 spark 版本 version 1.6.2 和 python 版本 Python 2.7.9

提前感谢您的帮助。

情况1:这里没有什么奇怪的,简单地说(如错误信息所说)LogisticRegression不支持多classclass化,如 documentation.

中明确说明

案例 2:这里您已从 ML 切换到 MLlib,但是它不适用于数据帧,但需要输入为 LabeledPoint 的 RDD(documentation),因此再次出现错误消息。

案例 3:这里是事情变得有趣的地方。首先,您应该从 map 函数中删除括号,即它应该是

trainingData = trainingData.map(lambda row: LabeledPoint(row.label, row.features)) # no brackets after "row:"

尽管如此,根据您提供的代码片段猜测,您现在很可能会遇到不同的错误:

model = LogisticRegressionWithLBFGS.train(trainingData, numClasses=5)
[...]
: org.apache.spark.SparkException: Input validation failed.

这是发生了什么(我花了一些时间才弄清楚),使用了一些虚拟数据(为您的问题提供一些样本数据总是一个好主意):

# 3-class classification
data = sc.parallelize([
     LabeledPoint(3.0, SparseVector(100,[10, 98],[1.0, 1.0])),
     LabeledPoint(1.0, SparseVector(100,[1, 22],[1.0, 1.0])),
     LabeledPoint(2.0, SparseVector(100,[36, 54],[1.0, 1.0]))
])

lrm = LogisticRegressionWithLBFGS.train(data, iterations=10, numClasses=3) # throws exception
[...]
: org.apache.spark.SparkException: Input validation failed.

问题是 你的标签必须从 0 开始(这在任何地方都没有记录 - 你必须深入研究 Scala source code 才能看到这种情况!);因此,将上面我的虚拟数据中的标签从 (1.0, 2.0, 3.0) 映射到 (0.0, 1.0, 2.0),我们最终得到:

# 3-class classification
data = sc.parallelize([
     LabeledPoint(2.0, SparseVector(100,[10, 98],[1.0, 1.0])),
     LabeledPoint(0.0, SparseVector(100,[1, 22],[1.0, 1.0])),
     LabeledPoint(1.0, SparseVector(100,[36, 54],[1.0, 1.0]))
])

lrm = LogisticRegressionWithLBFGS.train(data, iterations=10, numClasses=3) # no error now

从您的 numClasses=5 论点以及您的一份打印记录中的 label=5.0 判断,我猜您的代码很可能遇到同样的问题。将您的标签更改为 [0.0, 4.0],您应该没问题。

(我建议您删除您打开的另一个相同问题 here,以减少混乱...)