Spark 作业中的 Kryo 序列化错误

Kryo serialization error in Spark job

我想在 Spark 作业中使用 Kryo 序列化。

public class SerializeTest {

    public static class Toto implements Serializable {
        private static final long serialVersionUID = 6369241181075151871L;
        private String a;

        public String getA() {
            return a;
        }

        public void setA(String a) {
            this.a = a;
        }
    }

    private static final PairFunction<Toto, Toto, Integer> WRITABLE_CONVERTOR = new PairFunction<Toto, Toto, Integer>() {
        private static final long serialVersionUID = -7119334882912691587L;

        @Override
        public Tuple2<Toto, Integer> call(Toto input) throws Exception {
            return new Tuple2<Toto, Integer>(input, 1);
        }
    };

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("SerializeTest");
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        conf.registerKryoClasses(new Class<?>[]{Toto[].class});
        JavaSparkContext context = new JavaSparkContext(conf);

        List<Toto> list = new ArrayList<Toto>();
        list.add(new Toto());
        JavaRDD<Toto> cursor = context.parallelize(list, list.size());

        JavaPairRDD<Toto, Integer> writable = cursor.mapToPair(WRITABLE_CONVERTOR);
        writable.saveAsHadoopFile(args[0], Toto.class, Integer.class, SequenceFileOutputFormat.class);

        context.close();
    }

}

但是我有这个错误:

java.io.IOException: Could not find a serializer for the Key class: 'com.test.SerializeTest.Toto'. Please ensure that the configuration 'io.serializations' is properly configured, if you're usingcustom serialization. at org.apache.hadoop.io.SequenceFile$Writer.init(SequenceFile.java:1179) at org.apache.hadoop.io.SequenceFile$Writer.(SequenceFile.java:1094) at org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:273) at org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:530) at org.apache.hadoop.mapred.SequenceFileOutputFormat.getRecordWriter(SequenceFileOutputFormat.java:63) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90) at org.apache.spark.rdd.PairRDDFunctions$$anonfun.apply(PairRDDFunctions.scala:1068) at org.apache.spark.rdd.PairRDDFunctions$$anonfun.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/09/21 17:49:14 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.io.IOException: Could not find a serializer for the Key class: 'com.test.SerializeTest.Toto'. Please ensure that the configuration 'io.serializations' is properly configured, if you're usingcustom serialization. at org.apache.hadoop.io.SequenceFile$Writer.init(SequenceFile.java:1179) at org.apache.hadoop.io.SequenceFile$Writer.(SequenceFile.java:1094) at org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:273) at org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:530) at org.apache.hadoop.mapred.SequenceFileOutputFormat.getRecordWriter(SequenceFileOutputFormat.java:63) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90) at org.apache.spark.rdd.PairRDDFunctions$$anonfun.apply(PairRDDFunctions.scala:1068) at org.apache.spark.rdd.PairRDDFunctions$$anonfun.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

谢谢。

此错误与 SparkKryo 均无关。

使用 Hadoop 输出格式 时,您需要确保 keyvalue Writable 的实例。 Hadoop 默认不使用 Java 序列化(你也不想使用它,因为它非常低效)

您可以在配置中检查 io.serializations 属性,您会看到使用过的序列化器列表,包括 org.apache.hadoop.io.serializer.WritableSerialization

要解决此问题,您的 Toto class 必须实施 WritableInteger 也有同样的问题,请使用 IntWritable.