Spark 作业中的 Kryo 序列化错误
Kryo serialization error in Spark job
我想在 Spark 作业中使用 Kryo 序列化。
public class SerializeTest {
public static class Toto implements Serializable {
private static final long serialVersionUID = 6369241181075151871L;
private String a;
public String getA() {
return a;
}
public void setA(String a) {
this.a = a;
}
}
private static final PairFunction<Toto, Toto, Integer> WRITABLE_CONVERTOR = new PairFunction<Toto, Toto, Integer>() {
private static final long serialVersionUID = -7119334882912691587L;
@Override
public Tuple2<Toto, Integer> call(Toto input) throws Exception {
return new Tuple2<Toto, Integer>(input, 1);
}
};
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("SerializeTest");
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(new Class<?>[]{Toto[].class});
JavaSparkContext context = new JavaSparkContext(conf);
List<Toto> list = new ArrayList<Toto>();
list.add(new Toto());
JavaRDD<Toto> cursor = context.parallelize(list, list.size());
JavaPairRDD<Toto, Integer> writable = cursor.mapToPair(WRITABLE_CONVERTOR);
writable.saveAsHadoopFile(args[0], Toto.class, Integer.class, SequenceFileOutputFormat.class);
context.close();
}
}
但是我有这个错误:
java.io.IOException: Could not find a serializer for the Key class: 'com.test.SerializeTest.Toto'. Please ensure that the configuration 'io.serializations' is properly configured, if you're usingcustom serialization.
at org.apache.hadoop.io.SequenceFile$Writer.init(SequenceFile.java:1179)
at org.apache.hadoop.io.SequenceFile$Writer.(SequenceFile.java:1094)
at org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:273)
at org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:530)
at org.apache.hadoop.mapred.SequenceFileOutputFormat.getRecordWriter(SequenceFileOutputFormat.java:63)
at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun.apply(PairRDDFunctions.scala:1068)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun.apply(PairRDDFunctions.scala:1059)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/09/21 17:49:14 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.io.IOException: Could not find a serializer for the Key class: 'com.test.SerializeTest.Toto'. Please ensure that the configuration 'io.serializations' is properly configured, if you're usingcustom serialization.
at org.apache.hadoop.io.SequenceFile$Writer.init(SequenceFile.java:1179)
at org.apache.hadoop.io.SequenceFile$Writer.(SequenceFile.java:1094)
at org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:273)
at org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:530)
at org.apache.hadoop.mapred.SequenceFileOutputFormat.getRecordWriter(SequenceFileOutputFormat.java:63)
at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun.apply(PairRDDFunctions.scala:1068)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun.apply(PairRDDFunctions.scala:1059)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
谢谢。
此错误与 Spark 和 Kryo 均无关。
使用 Hadoop 输出格式 时,您需要确保 key 和 value Writable
的实例。 Hadoop 默认不使用 Java 序列化(你也不想使用它,因为它非常低效)
您可以在配置中检查 io.serializations
属性,您会看到使用过的序列化器列表,包括 org.apache.hadoop.io.serializer.WritableSerialization
要解决此问题,您的 Toto
class 必须实施 Writable
。 Integer
也有同样的问题,请使用 IntWritable
.
我想在 Spark 作业中使用 Kryo 序列化。
public class SerializeTest {
public static class Toto implements Serializable {
private static final long serialVersionUID = 6369241181075151871L;
private String a;
public String getA() {
return a;
}
public void setA(String a) {
this.a = a;
}
}
private static final PairFunction<Toto, Toto, Integer> WRITABLE_CONVERTOR = new PairFunction<Toto, Toto, Integer>() {
private static final long serialVersionUID = -7119334882912691587L;
@Override
public Tuple2<Toto, Integer> call(Toto input) throws Exception {
return new Tuple2<Toto, Integer>(input, 1);
}
};
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("SerializeTest");
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(new Class<?>[]{Toto[].class});
JavaSparkContext context = new JavaSparkContext(conf);
List<Toto> list = new ArrayList<Toto>();
list.add(new Toto());
JavaRDD<Toto> cursor = context.parallelize(list, list.size());
JavaPairRDD<Toto, Integer> writable = cursor.mapToPair(WRITABLE_CONVERTOR);
writable.saveAsHadoopFile(args[0], Toto.class, Integer.class, SequenceFileOutputFormat.class);
context.close();
}
}
但是我有这个错误:
java.io.IOException: Could not find a serializer for the Key class: 'com.test.SerializeTest.Toto'. Please ensure that the configuration 'io.serializations' is properly configured, if you're usingcustom serialization. at org.apache.hadoop.io.SequenceFile$Writer.init(SequenceFile.java:1179) at org.apache.hadoop.io.SequenceFile$Writer.(SequenceFile.java:1094) at org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:273) at org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:530) at org.apache.hadoop.mapred.SequenceFileOutputFormat.getRecordWriter(SequenceFileOutputFormat.java:63) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90) at org.apache.spark.rdd.PairRDDFunctions$$anonfun.apply(PairRDDFunctions.scala:1068) at org.apache.spark.rdd.PairRDDFunctions$$anonfun.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/09/21 17:49:14 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.io.IOException: Could not find a serializer for the Key class: 'com.test.SerializeTest.Toto'. Please ensure that the configuration 'io.serializations' is properly configured, if you're usingcustom serialization. at org.apache.hadoop.io.SequenceFile$Writer.init(SequenceFile.java:1179) at org.apache.hadoop.io.SequenceFile$Writer.(SequenceFile.java:1094) at org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:273) at org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:530) at org.apache.hadoop.mapred.SequenceFileOutputFormat.getRecordWriter(SequenceFileOutputFormat.java:63) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90) at org.apache.spark.rdd.PairRDDFunctions$$anonfun.apply(PairRDDFunctions.scala:1068) at org.apache.spark.rdd.PairRDDFunctions$$anonfun.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
谢谢。
此错误与 Spark 和 Kryo 均无关。
使用 Hadoop 输出格式 时,您需要确保 key 和 value Writable
的实例。 Hadoop 默认不使用 Java 序列化(你也不想使用它,因为它非常低效)
您可以在配置中检查 io.serializations
属性,您会看到使用过的序列化器列表,包括 org.apache.hadoop.io.serializer.WritableSerialization
要解决此问题,您的 Toto
class 必须实施 Writable
。 Integer
也有同样的问题,请使用 IntWritable
.