Py4JJavaError: An error occurred while calling o735.fit

Py4JJavaError: An error occurred while calling o735.fit

我试图在 Pyspark 中安装朴素贝叶斯分类器,但每当我尝试 运行 我的代码时,我都会收到以下错误:

Py4JJavaError: An error occurred while calling o735.fit. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 110.0 failed 1 times, most recent failure: Lost task 0.0 in stage 110.0 (TID 76) (POPRB executor driver): org.apache.spark.SparkException: Failed to execute user defined function (NaiveBayes$$Lambda70/739115029: (structtype:tinyint,size:int,indices:array<int,values:array>) => structtype:tinyint,size:int,indices:array<int,values:array>) at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:136) at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1196) at org.apache.spark.ml.stat.SummaryBuilderImpl$MetricsAggregate.update(Summarizer.scala:382) at org.apache.spark.ml.stat.SummaryBuilderImpl$MetricsAggregate.update(Summarizer.scala:345) at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.update(interfaces.scala:583) at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun.$anonfun$applyOrElse(AggregationIterator.scala:197) at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun.$anonfun$applyOrElse$adapted(AggregationIterator.scala:197) at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateProcessRow(AggregationIterator.scala:214) at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateProcessRow$adapted(AggregationIterator.scala:208) at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:169) at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:83) at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute(ObjectHashAggregateExec.scala:112) at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$adapted(ObjectHashAggregateExec.scala:88) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal(RDD.scala:885) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$adapted(RDD.scala:885) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) Caused by: java.lang.IllegalArgumentException: requirement failed: Naive Bayes requires nonnegative feature values but found [9.0,3.0,10.0,2.0,3.0,4.0,3.0,0.0,3.0,1.0,0.0,-990.0,0.0,0.0,1.0,0.0,1.0,0.0]. at scala.Predef$.require(Predef.scala:281) at org.apache.spark.ml.classification.NaiveBayes$.requireNonnegativeValues(NaiveBayes.scala:359) at org.apache.spark.ml.classification.NaiveBayes.$anonfun$trainDiscreteImpl(NaiveBayes.scala:178) at org.apache.spark.sql.catalyst.expressions.ScalaUDF.$anonfun$f(ScalaUDF.scala:210) at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1192) ... 29 more

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage(DAGScheduler.scala:2403) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$adapted(DAGScheduler.scala:2402) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed(DAGScheduler.scala:1160) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$adapted(DAGScheduler.scala:1160) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573) at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:49) Caused by: org.apache.spark.SparkException: Failed to execute user defined function (NaiveBayes$$Lambda70/739115029: (structtype:tinyint,size:int,indices:array<int,values:array>) => structtype:tinyint,size:int,indices:array<int,values:array>) at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:136) at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1196) at org.apache.spark.ml.stat.SummaryBuilderImpl$MetricsAggregate.update(Summarizer.scala:382) at org.apache.spark.ml.stat.SummaryBuilderImpl$MetricsAggregate.update(Summarizer.scala:345) at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.update(interfaces.scala:583) at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun.$anonfun$applyOrElse(AggregationIterator.scala:197) at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun.$anonfun$applyOrElse$adapted(AggregationIterator.scala:197) at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateProcessRow(AggregationIterator.scala:214) at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateProcessRow$adapted(AggregationIterator.scala:208) at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:169) at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:83) at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute(ObjectHashAggregateExec.scala:112) at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$adapted(ObjectHashAggregateExec.scala:88) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal(RDD.scala:885) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$adapted(RDD.scala:885) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) Caused by: java.lang.IllegalArgumentException: requirement failed: Naive Bayes requires nonnegative feature values but found [9.0,3.0,10.0,2.0,3.0,4.0,3.0,0.0,3.0,1.0,0.0,-990.0,0.0,0.0,1.0,0.0,1.0,0.0]. at scala.Predef$.require(Predef.scala:281) at org.apache.spark.ml.classification.NaiveBayes$.requireNonnegativeValues(NaiveBayes.scala:359) at org.apache.spark.ml.classification.NaiveBayes.$anonfun$trainDiscreteImpl(NaiveBayes.scala:178) at org.apache.spark.sql.catalyst.expressions.ScalaUDF.$anonfun$f(ScalaUDF.scala:210) at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1192) ... 29 more

这是我正在尝试的代码 运行:

from pyspark.ml.classification import NaiveBayes

nb = NaiveBayes(modelType="multinomial", labelCol='ProdTaken_numeric')

pipeline3 = Pipeline(stages=[featuresCreator,
                             nb])
model_nb = pipeline3.fit(df_travel_prediction_train).transform(df_travel_prediction_test)


from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy") 
nbaccuracy = evaluator.evaluate(model_nb) 
print("Test accuracy = " + str(nbaccuracy))

欢迎任何建议and/or帮助,提前致谢。

仔细查看后,我注意到数据集中的某些值为负数,因此朴素贝叶斯分类器无法正常工作。正如错误报告中的以下行所指定:

Caused by: java.lang.IllegalArgumentException: requirement failed: Naive Bayes requires nonnegative feature values but found [9.0,3.0,10.0,2.0,3.0,4.0,3.0,0.0,3.0,1.0,0.0,-990.0,0.0,0.0,1.0,0.0,1.0,0.0].

我通过将 abs() 应用到负值所在的列来修复此问题,因为它本来就不应该有任何负值。