Spark kryo_serializers 和广播<Map<Object, Iterable<GowallaDataLocation>>> java.io.IOException: java.lang.UnsupportedOperationException

Spark kryo_serializers and Broadcast<Map<Object, Iterable<GowallaDataLocation>>> java.io.IOException: java.lang.UnsupportedOperationException

当我尝试访问广播变量时​​收到此异常:

17/03/26 03:04:23 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 10, 192.168.56.5, executor 1): java.io.IOException: java.lang.UnsupportedOperationException at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96) at GowallaTask.call(GowallaTask.java:214) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach.apply(JavaRDDLike.scala:351) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach.apply(JavaRDDLike.scala:351) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD$$anonfun$foreach$$anonfun$apply.apply(RDD.scala:917) at org.apache.spark.rdd.RDD$$anonfun$foreach$$anonfun$apply.apply(RDD.scala:917) at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1944) at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1944) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.UnsupportedOperationException at java.util.AbstractMap.put(AbstractMap.java:209) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:162) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:244) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun.apply(TorrentBroadcast.scala:286) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:287) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock.apply(TorrentBroadcast.scala:221) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269) ... 19 more

我在使用 KryoSerializer 时收到异常

    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
    conf.set("spark.kryoserializer.buffer.mb", "24");

这是我的代码。

JavaPairRDD<Object, Iterable<GowallaDataLocation>> line_RDD_2 = sc
            .textFile("/home/piero/gowalla_location.txt", 2).map(new GowallaMapperDataLocation())
            .groupBy(new Function<GowallaDataLocation, Object>() {

                /**
                 * 
                 */
                private static final long serialVersionUID = -6773509902594100325L;

                @Override
                public Object call(GowallaDataLocation v1) throws Exception {
                    DateFormat dateFormat = new SimpleDateFormat("yyyyMMdd");

                    return dateFormat.format(v1.getDATE());
                }
            }).persist(StorageLevel.MEMORY_AND_DISK_SER());



Broadcast<Map<Object, Iterable<GowallaDataLocation>>> broadcastVar_2 = sc.broadcast(line_RDD_2.collectAsMap());
    //System.out.println(broadcastVar_2.getValue().size());

    JavaRDD<Object> keys = line_RDD_2.keys().persist(StorageLevel.MEMORY_ONLY_SER());
    line_RDD_2.unpersist();

    keys.foreach(new VoidFunction<Object>() {

        /**
         * 
         */
        private static final long serialVersionUID = -8148877518271969523L;

        @Override
        public void call(Object t) throws Exception {
            // TODO Auto-generated method stub
            //System.out.println("KEY:" + t + " ");
            Iterable<GowallaDataLocation> dr = broadcastVar_2.getValue().get(t);

        }

    });

我怀疑发生这种情况是因为您正在直接广播 line_RDD_2.collectAsMap():这意味着广播的类型是 Map 并且 kryo 不知道正确的实现并将使用 AbstractMap 进行内部工作

就像我这样做:

Map<String, String> a = new HashMap<String, String>();
a.put("a", "b");
Set<String> c = a.keySet();
c.add("e");

我将获得 AbstractCollection 不受支持的操作,轻松解决:

Map<String, String> a = new HashMap<String, String>();
a.put("a", "b");
Set<String> c = new TreeSet<String>();
c.addAll(a.keySet());
c.add("e");

如果我的猜测是正确的,你可以这样解决:

Map<Object, Iterable<GowallaDataLocation>> a = new HashMap<>();
a.putAll(line_RDD_2.collectAsMap());
Broadcast<Map<Object, Iterable<GowallaDataLocation>>> broadcastVar_2 = sc.broadcast(a);

让我知道这是否有效