调用 Spark MLlib 时出现 TypeError LogisticRegressionWithLBFGS.train

TypeError when calling Spark MLlib LogisticRegressionWithLBFGS.train

我正在尝试使用训练数据从 Spark MLlib 调用 LogisticRegressionWithLBFGS.train 以解决多重 class 逻辑回归。我的训练集数据表示为:

trainingData = sXYdf.rdd.map(lambda x: reg.LabeledPoint(x[0]-1,x[1:]))
trainingData.take(2)

LabeledPoints(2 行)的输出是:(我没有输出完整的标签和特征,因为它是 2x401 标签特征矩阵,特征占据第 1-401 列,而标签位于第 0 列。相同的数据如下所示:-

[LabeledPoint(9.0, [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,8.56059679589e-06,1.94035947712e-06,-0.00073743872549,-0.0081340379902,-0.0186104473039,-0.0187412865354,-0.018757250817,-0.0190963541667...])]

现在当我打电话给

lrm=LogisticRegressionWithLBFGS.train(trainingData,numClasses=10)

我收到以下错误:

TypeError                                 Traceback (most recent call last)
<ipython-input-20-9b0c5530b34b> in <module>()
      1 #lr=LogisticRegression(maxIter=10, regParam=0.0, elasticNetParam=0.0)
----> 2 lrm=LogisticRegressionWithLBFGS.train(trainingData,numClasses=10)

C:\spark-2.1.1-bin-hadoop2.7\spark-2.1.1-bin-hadoop2.7\python\pyspark\mllib\classification.py in train(cls, data, iterations, initialWeights, regParam, regType, intercept, corrections, tolerance, validateData, numClasses)
    396                 else:
    397                     initialWeights = [0.0] * len(data.first().features) * (numClasses - 1)
--> 398         return _regression_train_wrapper(train, LogisticRegressionModel, data, initialWeights)
    399 
    400 

C:\spark-2.1.1-bin-hadoop2.7\spark-2.1.1-bin-hadoop2.7\python\pyspark\mllib\regression.py in _regression_train_wrapper(train_func, modelClass, data, initial_weights)
    214         weights, intercept, numFeatures, numClasses = train_func(
    215             data, _convert_to_vector(initial_weights))
--> 216         return modelClass(weights, intercept, numFeatures, numClasses)
    217     else:
    218         weights, intercept = train_func(data, _convert_to_vector(initial_weights))

C:\spark-2.1.1-bin-hadoop2.7\spark-2.1.1-bin-hadoop2.7\python\pyspark\mllib\classification.py in __init__(self, weights, intercept, numFeatures, numClasses)
    174             self._dataWithBiasSize = self._coeff.size / (self._numClasses - 1)
    175             self._weightsMatrix = self._coeff.toArray().reshape(self._numClasses - 1,
--> 176                                                                 self._dataWithBiasSize)
    177 
    178     @property

TypeError: 'float' object cannot be interpreted as an integer

添加了更多日志:- 看起来工作线程创建有问题..

17/07/15 19:59:14 WARN TaskSetManager: Stage 123 contains a task of very large size (17658 KB). The maximum recommended task size is 100 KB.
17/07/15 19:59:24 ERROR Executor: Exception in task 0.0 in stage 123.0 (TID 123)
org.apache.spark.SparkException: Python worker did not connect back in time
        at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:138)
        at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:67)
        at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:116)
        at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:128)
        at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketTimeoutException: Accept timed out
        at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
        at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
        at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
        at java.net.PlainSocketImpl.accept(Unknown Source)
        at java.net.ServerSocket.implAccept(Unknown Source)
        at java.net.ServerSocket.accept(Unknown Source)
        at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:133)
        ... 27 more
17/07/15 19:59:24 WARN TaskSetManager: Lost task 0.0 in stage 123.0 (TID 123, localhost, executor driver): org.apache.spark.SparkException: Python worker did not connect back in time
        at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:138)
        at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:67)
        at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:116)
        at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:128)
        at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketTimeoutException: Accept timed out
        at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
        at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
        at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
        at java.net.PlainSocketImpl.accept(Unknown Source)
        at java.net.ServerSocket.implAccept(Unknown Source)
        at java.net.ServerSocket.accept(Unknown Source)
        at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:133)
        ... 27 more

17/07/15 19:59:24 ERROR TaskSetManager: Task 0 in stage 123.0 failed 1 times; aborting job
Traceback (most recent call last):
  File "C:\Users\Sunil\Anaconda3\lib\runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "C:\Users\Sunil\Anaconda3\lib\runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "C:\spark-2.1.1-bin-hadoop2.7\spark-2.1.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 211, in <module>
ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it
[I 20:01:12.525 NotebookApp] Saving file at /mltclasspyspark.ipynb

好吧,似乎 there is a bug 在 Spark 2.1.1 中使用 Python 3 产生了上述错误(我无法使用 Python 2.7 重现它)。

因此,如果您无法升级到 Spark 2.1.2 或 2.2(据报道问题已解决),或者改用 Python 2.7,我 建议正在按如下方式修改您的 map 函数,以便您的标签现在是整数而不是浮点数(尽管尚未测试):

trainingData = sXYdf.rdd.map(lambda x: reg.LabeledPoint(int(x[0]-1),x[1:]))