org.apache.spark.SparkException: 任务不可序列化
org.apache.spark.SparkException: Task not serializable
这是一个有效的代码示例:
JavaPairDStream<String, String> messages = KafkaUtils.createStream(javaStreamingContext, zkQuorum, group, topicMap);
messages.print();
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
我收到以下错误:
ERROR:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:438)
at org.apache.spark.streaming.api.java.JavaDStreamLike$class.map(JavaDStreamLike.scala:140)
at org.apache.spark.streaming.api.java.JavaPairDStream.map(JavaPairDStream.scala:46)
由于您使用匿名内部 class 定义 map 函数,因此包含 class 的函数也必须是可序列化的。将您的地图函数定义为单独的 class 或将其设为静态内部 class。来自 Java 文档 (http://docs.oracle.com/javase/8/docs/platform/serialization/spec/serial-arch.html):
Note - Serialization of inner classes (i.e., nested classes that are not static member classes), including local and anonymous classes, is strongly discouraged for several reasons. Because inner classes declared in non-static contexts contain implicit non-transient references to enclosing class instances, serializing such an inner class instance will result in serialization of its associated outer class instance as well.
仅提供代码示例:
JavaDStream<String> lines = messages.map(mapFunc);
将内部 class 声明为静态变量 :
static Function<Tuple2<String, String>, String> mapFunc=new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
}
这是一个有效的代码示例:
JavaPairDStream<String, String> messages = KafkaUtils.createStream(javaStreamingContext, zkQuorum, group, topicMap);
messages.print();
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
我收到以下错误:
ERROR:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:438)
at org.apache.spark.streaming.api.java.JavaDStreamLike$class.map(JavaDStreamLike.scala:140)
at org.apache.spark.streaming.api.java.JavaPairDStream.map(JavaPairDStream.scala:46)
由于您使用匿名内部 class 定义 map 函数,因此包含 class 的函数也必须是可序列化的。将您的地图函数定义为单独的 class 或将其设为静态内部 class。来自 Java 文档 (http://docs.oracle.com/javase/8/docs/platform/serialization/spec/serial-arch.html):
Note - Serialization of inner classes (i.e., nested classes that are not static member classes), including local and anonymous classes, is strongly discouraged for several reasons. Because inner classes declared in non-static contexts contain implicit non-transient references to enclosing class instances, serializing such an inner class instance will result in serialization of its associated outer class instance as well.
仅提供代码示例:
JavaDStream<String> lines = messages.map(mapFunc);
将内部 class 声明为静态变量 :
static Function<Tuple2<String, String>, String> mapFunc=new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
}