Spark kryo_serializers 和广播<Map<Object, Iterable<GowallaDataLocation>>> java.io.IOException: java.lang.UnsupportedOperationException
Spark kryo_serializers and Broadcast<Map<Object, Iterable<GowallaDataLocation>>> java.io.IOException: java.lang.UnsupportedOperationException
当我尝试访问广播变量时收到此异常:
17/03/26 03:04:23 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 10, 192.168.56.5, executor 1): java.io.IOException: java.lang.UnsupportedOperationException
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276)
at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
at GowallaTask.call(GowallaTask.java:214)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach.apply(JavaRDDLike.scala:351)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach.apply(JavaRDDLike.scala:351)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$foreach$$anonfun$apply.apply(RDD.scala:917)
at org.apache.spark.rdd.RDD$$anonfun$foreach$$anonfun$apply.apply(RDD.scala:917)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException
at java.util.AbstractMap.put(AbstractMap.java:209)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:162)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790)
at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:244)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun.apply(TorrentBroadcast.scala:286)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303)
at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:287)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock.apply(TorrentBroadcast.scala:221)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
... 19 more
我在使用 KryoSerializer 时收到异常
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.set("spark.kryoserializer.buffer.mb", "24");
这是我的代码。
JavaPairRDD<Object, Iterable<GowallaDataLocation>> line_RDD_2 = sc
.textFile("/home/piero/gowalla_location.txt", 2).map(new GowallaMapperDataLocation())
.groupBy(new Function<GowallaDataLocation, Object>() {
/**
*
*/
private static final long serialVersionUID = -6773509902594100325L;
@Override
public Object call(GowallaDataLocation v1) throws Exception {
DateFormat dateFormat = new SimpleDateFormat("yyyyMMdd");
return dateFormat.format(v1.getDATE());
}
}).persist(StorageLevel.MEMORY_AND_DISK_SER());
Broadcast<Map<Object, Iterable<GowallaDataLocation>>> broadcastVar_2 = sc.broadcast(line_RDD_2.collectAsMap());
//System.out.println(broadcastVar_2.getValue().size());
JavaRDD<Object> keys = line_RDD_2.keys().persist(StorageLevel.MEMORY_ONLY_SER());
line_RDD_2.unpersist();
keys.foreach(new VoidFunction<Object>() {
/**
*
*/
private static final long serialVersionUID = -8148877518271969523L;
@Override
public void call(Object t) throws Exception {
// TODO Auto-generated method stub
//System.out.println("KEY:" + t + " ");
Iterable<GowallaDataLocation> dr = broadcastVar_2.getValue().get(t);
}
});
我怀疑发生这种情况是因为您正在直接广播 line_RDD_2.collectAsMap()
:这意味着广播的类型是 Map 并且 kryo 不知道正确的实现并将使用 AbstractMap
进行内部工作
就像我这样做:
Map<String, String> a = new HashMap<String, String>();
a.put("a", "b");
Set<String> c = a.keySet();
c.add("e");
我将获得 AbstractCollection
不受支持的操作,轻松解决:
Map<String, String> a = new HashMap<String, String>();
a.put("a", "b");
Set<String> c = new TreeSet<String>();
c.addAll(a.keySet());
c.add("e");
如果我的猜测是正确的,你可以这样解决:
Map<Object, Iterable<GowallaDataLocation>> a = new HashMap<>();
a.putAll(line_RDD_2.collectAsMap());
Broadcast<Map<Object, Iterable<GowallaDataLocation>>> broadcastVar_2 = sc.broadcast(a);
让我知道这是否有效
当我尝试访问广播变量时收到此异常:
17/03/26 03:04:23 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 10, 192.168.56.5, executor 1): java.io.IOException: java.lang.UnsupportedOperationException at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96) at GowallaTask.call(GowallaTask.java:214) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach.apply(JavaRDDLike.scala:351) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach.apply(JavaRDDLike.scala:351) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD$$anonfun$foreach$$anonfun$apply.apply(RDD.scala:917) at org.apache.spark.rdd.RDD$$anonfun$foreach$$anonfun$apply.apply(RDD.scala:917) at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1944) at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1944) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException at java.util.AbstractMap.put(AbstractMap.java:209) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:162) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:244) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun.apply(TorrentBroadcast.scala:286) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:287) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock.apply(TorrentBroadcast.scala:221) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269) ... 19 more
我在使用 KryoSerializer 时收到异常
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.set("spark.kryoserializer.buffer.mb", "24");
这是我的代码。
JavaPairRDD<Object, Iterable<GowallaDataLocation>> line_RDD_2 = sc
.textFile("/home/piero/gowalla_location.txt", 2).map(new GowallaMapperDataLocation())
.groupBy(new Function<GowallaDataLocation, Object>() {
/**
*
*/
private static final long serialVersionUID = -6773509902594100325L;
@Override
public Object call(GowallaDataLocation v1) throws Exception {
DateFormat dateFormat = new SimpleDateFormat("yyyyMMdd");
return dateFormat.format(v1.getDATE());
}
}).persist(StorageLevel.MEMORY_AND_DISK_SER());
Broadcast<Map<Object, Iterable<GowallaDataLocation>>> broadcastVar_2 = sc.broadcast(line_RDD_2.collectAsMap());
//System.out.println(broadcastVar_2.getValue().size());
JavaRDD<Object> keys = line_RDD_2.keys().persist(StorageLevel.MEMORY_ONLY_SER());
line_RDD_2.unpersist();
keys.foreach(new VoidFunction<Object>() {
/**
*
*/
private static final long serialVersionUID = -8148877518271969523L;
@Override
public void call(Object t) throws Exception {
// TODO Auto-generated method stub
//System.out.println("KEY:" + t + " ");
Iterable<GowallaDataLocation> dr = broadcastVar_2.getValue().get(t);
}
});
我怀疑发生这种情况是因为您正在直接广播 line_RDD_2.collectAsMap()
:这意味着广播的类型是 Map 并且 kryo 不知道正确的实现并将使用 AbstractMap
进行内部工作
就像我这样做:
Map<String, String> a = new HashMap<String, String>();
a.put("a", "b");
Set<String> c = a.keySet();
c.add("e");
我将获得 AbstractCollection
不受支持的操作,轻松解决:
Map<String, String> a = new HashMap<String, String>();
a.put("a", "b");
Set<String> c = new TreeSet<String>();
c.addAll(a.keySet());
c.add("e");
如果我的猜测是正确的,你可以这样解决:
Map<Object, Iterable<GowallaDataLocation>> a = new HashMap<>();
a.putAll(line_RDD_2.collectAsMap());
Broadcast<Map<Object, Iterable<GowallaDataLocation>>> broadcastVar_2 = sc.broadcast(a);
让我知道这是否有效