Spark JPMML 导入问题
Spark JPMML import Issue
我正在尝试将在 R 中生成的 PMML 模型文件导入到 Spark Context 并使用它来预测分数。这是Spark中使用的代码。
JavaRDD<String> scoreData = data.map(new Function<String, String>() {
@Override
public String call(String line) throws Exception {
String[] row = line.split(",");
PMML pmml;
Evaluator evaluator;
FileSystem fs = FileSystem.get(new Configuration());
FSDataInputStream inStr = fs.open(new Path("PATH_TO_PMML_FILE"));
Source transformedSource = ImportFilter.apply(new InputSource(inStr));
pmml = JAXBUtil.unmarshalPMML(transformedSource);
System.out.println(pmml.getModels().get(0).getModelName());
ModelEvaluatorFactory modelEvaluatorFactory = ModelEvaluatorFactory.newInstance();
ModelEvaluator<?> modelEvaluator = modelEvaluatorFactory.newModelManager(pmml);
System.out.println(modelEvaluator.getSummary());
evaluator = (Evaluator) modelEvaluator;
List<FieldName> activeFields = evaluator.getActiveFields();
double[] features = new double[row.length - 2]; // row - {contact_id,label}
StringBuilder strBld = new StringBuilder();
Map<FieldName, FieldValue> arguments = new LinkedHashMap<FieldName, FieldValue>();
strBld.append(row[0]);
for (int i = 3; i <= row.length - 1; i++) {
//from f1 - f16
FieldValue activeValue = evaluator.prepare(activeFields.get(i - 3), Double.parseDouble(row[i]));
arguments.put(activeFields.get(i - 3), activeValue);
}
}
在核心 Java 环境(没有 Spark 上下文)中 运行 时,代码工作正常,但是当 运行 使用上述代码时,我得到以下异常
java.lang.NoSuchMethodError: com.google.common.collect.Range.closed(Ljava/lang/Comparable;Ljava/lang/Comparable;)Lcom/google/common/collect/Range;
at org.jpmml.evaluator.Classification$Type.<clinit>(Classification.java:278)
at org.jpmml.evaluator.ProbabilityDistribution.<init>(ProbabilityDistribution.java:26)
at org.jpmml.evaluator.GeneralRegressionModelEvaluator.evaluateClassification(GeneralRegressionModelEvaluator.java:333)
at org.jpmml.evaluator.GeneralRegressionModelEvaluator.evaluate(GeneralRegressionModelEvaluator.java:107)
at org.jpmml.evaluator.ModelEvaluator.evaluate(ModelEvaluator.java:266)
at org.zcoe.spark.pmml.PMMLSpark_2.call(PMMLSpark_2.java:146)
at org.zcoe.spark.pmml.PMMLSpark_2.call(PMMLSpark_2.java:1)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction.apply(JavaPairRDD.scala:999)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:813)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:813)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1503)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1503)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
问题似乎与 运行 代码所需的 Guvava Jar 文件的兼容性有关。
我从 Spark 的 class 路径中删除了所有包含 com.google.common.collect.Range Class 的 Jar,但同样的问题仍然存在。
下面是 Spark 作业的详细信息,
spark-submit --jars ./lib/pmml-evaluator-1.2.0.jar,./lib/pmml-model-1.2.2.jar,./lib/pmml-manager-1.1.20.jar,./lib/pmml-schema-1.2.2.jar,./lib/guava-15.0.jar --class
[阶段 0:> (0 + 2) / 2]15/06/26 14:39:15 错误 YarnScheduler:hslave2 上的执行程序 1 丢失:远程 Akka 客户端断开关联
26 年 6 月 15 日 14:39:15 错误 YarnScheduler:hslave1 上丢失执行程序 2:远程 Akka 客户端已断开关联
[阶段 0:> (0 + 2) / 2]15/06/26 14:39:33 错误 YarnScheduler:hslave1 上的执行程序 4 丢失:远程 Akka 客户端断开关联
15/06/26 14:39:33 错误 TaskSetManager:阶段 0.0 中的任务 0 失败 4 次;中止作业
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, hslave1): ExecutorLostFailure (executor 4 lost)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1192)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1191)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
如果我做错了,请告诉我。
你应该让 Spark 和 JPMML 都有自己版本的 Guava 库。当您可以通过简单地重新打包 Spark 应用程序来实现您的目标时,修改 Spark 基础安装不是一个好主意。
如果您将 Spark 应用程序移动到 Apache Maven,则可以使用 relocation feature of the Maven Shade Plugin to move JPMML's version of Guava library to another package such as org.jpmml.com.google
. The example application of the JPMML-Cascading project 来完成这个技巧。
此外,迁移到 Apache Maven 的好处是您的 Spark 应用程序将作为 uber-JAR 文件提供,这大大简化了它的部署。例如,目前您在命令行上指定 pmml-manager-1.1.20.jar
,这是不需要的。
我正在尝试将在 R 中生成的 PMML 模型文件导入到 Spark Context 并使用它来预测分数。这是Spark中使用的代码。
JavaRDD<String> scoreData = data.map(new Function<String, String>() {
@Override
public String call(String line) throws Exception {
String[] row = line.split(",");
PMML pmml;
Evaluator evaluator;
FileSystem fs = FileSystem.get(new Configuration());
FSDataInputStream inStr = fs.open(new Path("PATH_TO_PMML_FILE"));
Source transformedSource = ImportFilter.apply(new InputSource(inStr));
pmml = JAXBUtil.unmarshalPMML(transformedSource);
System.out.println(pmml.getModels().get(0).getModelName());
ModelEvaluatorFactory modelEvaluatorFactory = ModelEvaluatorFactory.newInstance();
ModelEvaluator<?> modelEvaluator = modelEvaluatorFactory.newModelManager(pmml);
System.out.println(modelEvaluator.getSummary());
evaluator = (Evaluator) modelEvaluator;
List<FieldName> activeFields = evaluator.getActiveFields();
double[] features = new double[row.length - 2]; // row - {contact_id,label}
StringBuilder strBld = new StringBuilder();
Map<FieldName, FieldValue> arguments = new LinkedHashMap<FieldName, FieldValue>();
strBld.append(row[0]);
for (int i = 3; i <= row.length - 1; i++) {
//from f1 - f16
FieldValue activeValue = evaluator.prepare(activeFields.get(i - 3), Double.parseDouble(row[i]));
arguments.put(activeFields.get(i - 3), activeValue);
}
}
在核心 Java 环境(没有 Spark 上下文)中 运行 时,代码工作正常,但是当 运行 使用上述代码时,我得到以下异常
java.lang.NoSuchMethodError: com.google.common.collect.Range.closed(Ljava/lang/Comparable;Ljava/lang/Comparable;)Lcom/google/common/collect/Range;
at org.jpmml.evaluator.Classification$Type.<clinit>(Classification.java:278)
at org.jpmml.evaluator.ProbabilityDistribution.<init>(ProbabilityDistribution.java:26)
at org.jpmml.evaluator.GeneralRegressionModelEvaluator.evaluateClassification(GeneralRegressionModelEvaluator.java:333)
at org.jpmml.evaluator.GeneralRegressionModelEvaluator.evaluate(GeneralRegressionModelEvaluator.java:107)
at org.jpmml.evaluator.ModelEvaluator.evaluate(ModelEvaluator.java:266)
at org.zcoe.spark.pmml.PMMLSpark_2.call(PMMLSpark_2.java:146)
at org.zcoe.spark.pmml.PMMLSpark_2.call(PMMLSpark_2.java:1)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction.apply(JavaPairRDD.scala:999)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:813)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:813)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1503)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1503)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
问题似乎与 运行 代码所需的 Guvava Jar 文件的兼容性有关。 我从 Spark 的 class 路径中删除了所有包含 com.google.common.collect.Range Class 的 Jar,但同样的问题仍然存在。
下面是 Spark 作业的详细信息,
spark-submit --jars ./lib/pmml-evaluator-1.2.0.jar,./lib/pmml-model-1.2.2.jar,./lib/pmml-manager-1.1.20.jar,./lib/pmml-schema-1.2.2.jar,./lib/guava-15.0.jar --class
[阶段 0:> (0 + 2) / 2]15/06/26 14:39:15 错误 YarnScheduler:hslave2 上的执行程序 1 丢失:远程 Akka 客户端断开关联 26 年 6 月 15 日 14:39:15 错误 YarnScheduler:hslave1 上丢失执行程序 2:远程 Akka 客户端已断开关联 [阶段 0:> (0 + 2) / 2]15/06/26 14:39:33 错误 YarnScheduler:hslave1 上的执行程序 4 丢失:远程 Akka 客户端断开关联 15/06/26 14:39:33 错误 TaskSetManager:阶段 0.0 中的任务 0 失败 4 次;中止作业
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, hslave1): ExecutorLostFailure (executor 4 lost)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1192)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1191)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
如果我做错了,请告诉我。
你应该让 Spark 和 JPMML 都有自己版本的 Guava 库。当您可以通过简单地重新打包 Spark 应用程序来实现您的目标时,修改 Spark 基础安装不是一个好主意。
如果您将 Spark 应用程序移动到 Apache Maven,则可以使用 relocation feature of the Maven Shade Plugin to move JPMML's version of Guava library to another package such as org.jpmml.com.google
. The example application of the JPMML-Cascading project 来完成这个技巧。
此外,迁移到 Apache Maven 的好处是您的 Spark 应用程序将作为 uber-JAR 文件提供,这大大简化了它的部署。例如,目前您在命令行上指定 pmml-manager-1.1.20.jar
,这是不需要的。