解决 SparkException: Task not serializable when importing PMML model
Resolve SparkException: Task not serializable when importing PMML model
我想导入 PMML 模型,以使用 Spark 计算分数。当我不使用 spark 时一切正常,但我不能在映射器中使用我的方法。
问题是我需要一个来自 org.jpmml.evaluator.Evaluator 的 Evaluation 对象,它似乎不是可序列化的。所以我尝试使用以下 class :
使其成为 Serialiazable
package util;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import org.jpmml.evaluator.Evaluator;
public class SerializableEvaluator implements Serializable {
private static final long serialVersionUID = 6631604036553063657L;
private Evaluator evaluator;
public SerializableEvaluator(Evaluator evaluator) {
this.evaluator = evaluator;
}
public Evaluator getEvaluator() {
return evaluator;
}
private void writeObject(ObjectOutputStream out) throws IOException {
out.writeObject(evaluator);
}
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
Evaluator eval = (Evaluator) in.readObject();
}
}
我还使我所有的 classes 都可以序列化。
这是我的代码示例:
logger.info("Print 5 first rows----------------------------");
strTitanicRDD
.take(5)
.forEach(row -> logger.info(row));
logger.info("Print 5 first Titatnic Obs---------------------");
strTitanicRDD
.map(row -> new TitanicObservation(row))
.take(5)
.forEach(titanic -> logger.info(titanic.toString()));
logger.info("Print 5 first Scored Titatnic Obs---------------");
try{strTitanicRDD
.map(row -> new TitanicObservation(row))
.map(
new Function<TitanicObservation,String>(){
private static final long serialVersionUID = -2968122030659306400L;
@Override
public String call(TitanicObservation titanic) throws Exception {
String res = PmmlUtil.computeScoreTitanic(evaluator, titanic);
return res;
}
})
.take(5)
.forEach(row -> logger.info(row));
但我认为我的代码不会帮助您解决我的问题,这很清楚(请参阅日志:)
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
at org.apache.spark.rdd.RDD.map(RDD.scala:286)
at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:89)
at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:46)
at score.acv.AppWithSpark.main(AppWithSpark.java:117)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:577)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:174)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:197)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException:
org.xml.sax.helpers.LocatorImpl Serialization stack:
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 15 more
在 org.jpmml.evaluator.Evaluator
接口后面有一些 org.jpmml.evaluator.ModelEvaluator
subclass 的实例。 class ModelEvaluator
及其所有子 class 都是按设计可序列化的。问题与您在开始时提供给 ModelEvaluatorFactory#newModelManager(PMML)
方法的 org.dmg.pmml.PMML
对象实例有关。
简而言之,每个 PMML class 模型对象都可以附加 SAX 定位器信息。这在开发和测试阶段非常有用,可用于查找有问题的 XML 内容。但是,在生产阶段,不应再保留此信息。您可以通过正确配置 JAXB 运行时来禁用 SAX 定位器信息,或者通过使用 null
参数调用 PMMLObject#setLocator(Locatable)
来简单地清除现有的 SAX 定位器实例。后者的功能由 org.jpmml.model.visitors.LocatorNullifier
访问者 class 正式化。
完整示例,请参阅官方JPMML-Spark project的org.jpmml.spark.EvaluatorUtil
实用程序class(尤其是第73至75行)。为什么不首先使用 JPMML-Spark?
我想导入 PMML 模型,以使用 Spark 计算分数。当我不使用 spark 时一切正常,但我不能在映射器中使用我的方法。
问题是我需要一个来自 org.jpmml.evaluator.Evaluator 的 Evaluation 对象,它似乎不是可序列化的。所以我尝试使用以下 class :
使其成为 Serialiazablepackage util;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import org.jpmml.evaluator.Evaluator;
public class SerializableEvaluator implements Serializable {
private static final long serialVersionUID = 6631604036553063657L;
private Evaluator evaluator;
public SerializableEvaluator(Evaluator evaluator) {
this.evaluator = evaluator;
}
public Evaluator getEvaluator() {
return evaluator;
}
private void writeObject(ObjectOutputStream out) throws IOException {
out.writeObject(evaluator);
}
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
Evaluator eval = (Evaluator) in.readObject();
}
}
我还使我所有的 classes 都可以序列化。
这是我的代码示例:
logger.info("Print 5 first rows----------------------------");
strTitanicRDD
.take(5)
.forEach(row -> logger.info(row));
logger.info("Print 5 first Titatnic Obs---------------------");
strTitanicRDD
.map(row -> new TitanicObservation(row))
.take(5)
.forEach(titanic -> logger.info(titanic.toString()));
logger.info("Print 5 first Scored Titatnic Obs---------------");
try{strTitanicRDD
.map(row -> new TitanicObservation(row))
.map(
new Function<TitanicObservation,String>(){
private static final long serialVersionUID = -2968122030659306400L;
@Override
public String call(TitanicObservation titanic) throws Exception {
String res = PmmlUtil.computeScoreTitanic(evaluator, titanic);
return res;
}
})
.take(5)
.forEach(row -> logger.info(row));
但我认为我的代码不会帮助您解决我的问题,这很清楚(请参阅日志:)
org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1623) at org.apache.spark.rdd.RDD.map(RDD.scala:286) at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:89) at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:46) at score.acv.AppWithSpark.main(AppWithSpark.java:117) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:577) at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:174) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:197) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.xml.sax.helpers.LocatorImpl Serialization stack:
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 15 more
在 org.jpmml.evaluator.Evaluator
接口后面有一些 org.jpmml.evaluator.ModelEvaluator
subclass 的实例。 class ModelEvaluator
及其所有子 class 都是按设计可序列化的。问题与您在开始时提供给 ModelEvaluatorFactory#newModelManager(PMML)
方法的 org.dmg.pmml.PMML
对象实例有关。
简而言之,每个 PMML class 模型对象都可以附加 SAX 定位器信息。这在开发和测试阶段非常有用,可用于查找有问题的 XML 内容。但是,在生产阶段,不应再保留此信息。您可以通过正确配置 JAXB 运行时来禁用 SAX 定位器信息,或者通过使用 null
参数调用 PMMLObject#setLocator(Locatable)
来简单地清除现有的 SAX 定位器实例。后者的功能由 org.jpmml.model.visitors.LocatorNullifier
访问者 class 正式化。
完整示例,请参阅官方JPMML-Spark project的org.jpmml.spark.EvaluatorUtil
实用程序class(尤其是第73至75行)。为什么不首先使用 JPMML-Spark?