在 Spark 中配置 function/lambda 序列化
Configure function/lambda serialization in Spark
如何配置 Spark 以将 KryoSerializer 用于 lambda 表达式?还是我在 Spark 中发现了错误?我们对其他地方的数据序列化没有问题,只是这些 lambda 使用默认值而不是 Kryo。
代码如下:
JavaPairRDD<String, IonValue> rdd; // provided
IonSexp filterExpression; // provided
Function<Tuple2<String, IonValue>, Boolean> filterFunc = record -> myCustomFilter(filterExpression, record);
rdd = rdd.filter(filterFunc);
抛出异常:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.rdd.RDD$$anonfun$filter.apply(RDD.scala:388)
at org.apache.spark.rdd.RDD$$anonfun$filter.apply(RDD.scala:387)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.filter(RDD.scala:387)
at org.apache.spark.api.java.JavaPairRDD.filter(JavaPairRDD.scala:99)
at com.example.SomeClass.process(SomeClass.java:ABC)
{more stuff}
Caused by: java.io.NotSerializableException: com.amazon.ion.impl.lite.IonSexpLite
Serialization stack:
- object not serializable (class: com.amazon.ion.impl.lite.IonSexpLite, value: (and (equals (literal 1) (path marketplace_id)) (equals (literal 351) (path product gl_product_group))))
- element of array (index: 1)
- array (class [Ljava.lang.Object;, size 2)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class com.example.SomeClass, functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeSpecial com/example/SomeClass.lambda$processf20a2d2:(Lcom/amazon/ion/IonSexp;Lscala/Tuple2;)Ljava/lang/Boolean;, instantiatedMethodType=(Lscala/Tuple2;)Ljava/lang/Boolean;, numCaptured=2])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class com.example.SomeClass$$Lambda/263969036, com.example.SomeClass$$Lambda/263969036@31880efa)
- field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$filter, name: f, type: interface org.apache.spark.api.java.function.Function)
- object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$filter, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
... 18 more
在这种情况下,有问题的 filterExpression
是一个 Ion S-Expression 对象,它没有实现 java.io.Serializable
。我们正在使用 Kryo 序列化程序并已注册和配置它,以便它可以很好地序列化它。
初始化spark配置时的代码:
sparkConf = new SparkConf().setAppName("SomeAppName").setMaster("MasterLivesHere")
.set("spark.serializer", KryoSerializer.class.getCanonicalName())
.set("spark.kryo.registrator", KryoRegistrator.class.getCanonicalName())
.set("spark.kryo.registrationRequired", "false");
注册器中的代码:
kryo.register(com.amazon.ion.IonSexp.class);
kryo.register(Class.forName("com.amazon.ion.impl.lite.IonSexpLite"));
如果我尝试使用以下代码手动序列化该 lambda
SerializationUtils.serialize(filterFunc);
如预期的那样失败并出现相同的错误,因为 filterExpression
不可序列化。但是,下面的代码有效:
sparkContext.env().serializer().newInstance().serialize(filterFunc, ClassTag$.MODULE$.apply(filterFunc.getClass()));
这又是预期的,因为我们的 Kryo 设置能够处理这些对象。
所以我的 question/confusion 是,为什么 Spark 尝试使用 org.apache.spark.serializer.JavaSerializer
序列化那个 lambda,而我们已经明确地将其配置为使用 Kryo?
经过更多的挖掘后发现确实有一个不同的序列化程序用于闭包。由于 Kryo 的错误,闭包序列化程序被硬编码为默认序列化程序。
这个答案很好地解释了它:
不过,我能够使用广播解决我的特殊问题。
这是我的代码现在的样子:
JavaSparkContext sparkContext; // provided
JavaPairRDD<String, IonValue> rdd; // provided
IonSexp filterExpression; // provided
Broadcast<IonSexp> filterExprBroadcast = sparkContext.broadcast(filterExpression);
rdd = rdd.filter(record -> myCustomFilter(filterExprBroadcast.value(), record));
filterExprBroadcast.destroy(false); // Only do this after an action is executed
广播的处理方式与 RDD 类似,因此它确实使用了已配置的 Kryo 序列化程序。
如何配置 Spark 以将 KryoSerializer 用于 lambda 表达式?还是我在 Spark 中发现了错误?我们对其他地方的数据序列化没有问题,只是这些 lambda 使用默认值而不是 Kryo。
代码如下:
JavaPairRDD<String, IonValue> rdd; // provided
IonSexp filterExpression; // provided
Function<Tuple2<String, IonValue>, Boolean> filterFunc = record -> myCustomFilter(filterExpression, record);
rdd = rdd.filter(filterFunc);
抛出异常:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.rdd.RDD$$anonfun$filter.apply(RDD.scala:388)
at org.apache.spark.rdd.RDD$$anonfun$filter.apply(RDD.scala:387)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.filter(RDD.scala:387)
at org.apache.spark.api.java.JavaPairRDD.filter(JavaPairRDD.scala:99)
at com.example.SomeClass.process(SomeClass.java:ABC)
{more stuff}
Caused by: java.io.NotSerializableException: com.amazon.ion.impl.lite.IonSexpLite
Serialization stack:
- object not serializable (class: com.amazon.ion.impl.lite.IonSexpLite, value: (and (equals (literal 1) (path marketplace_id)) (equals (literal 351) (path product gl_product_group))))
- element of array (index: 1)
- array (class [Ljava.lang.Object;, size 2)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class com.example.SomeClass, functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeSpecial com/example/SomeClass.lambda$processf20a2d2:(Lcom/amazon/ion/IonSexp;Lscala/Tuple2;)Ljava/lang/Boolean;, instantiatedMethodType=(Lscala/Tuple2;)Ljava/lang/Boolean;, numCaptured=2])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class com.example.SomeClass$$Lambda/263969036, com.example.SomeClass$$Lambda/263969036@31880efa)
- field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$filter, name: f, type: interface org.apache.spark.api.java.function.Function)
- object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$filter, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
... 18 more
在这种情况下,有问题的 filterExpression
是一个 Ion S-Expression 对象,它没有实现 java.io.Serializable
。我们正在使用 Kryo 序列化程序并已注册和配置它,以便它可以很好地序列化它。
初始化spark配置时的代码:
sparkConf = new SparkConf().setAppName("SomeAppName").setMaster("MasterLivesHere")
.set("spark.serializer", KryoSerializer.class.getCanonicalName())
.set("spark.kryo.registrator", KryoRegistrator.class.getCanonicalName())
.set("spark.kryo.registrationRequired", "false");
注册器中的代码:
kryo.register(com.amazon.ion.IonSexp.class);
kryo.register(Class.forName("com.amazon.ion.impl.lite.IonSexpLite"));
如果我尝试使用以下代码手动序列化该 lambda
SerializationUtils.serialize(filterFunc);
如预期的那样失败并出现相同的错误,因为 filterExpression
不可序列化。但是,下面的代码有效:
sparkContext.env().serializer().newInstance().serialize(filterFunc, ClassTag$.MODULE$.apply(filterFunc.getClass()));
这又是预期的,因为我们的 Kryo 设置能够处理这些对象。
所以我的 question/confusion 是,为什么 Spark 尝试使用 org.apache.spark.serializer.JavaSerializer
序列化那个 lambda,而我们已经明确地将其配置为使用 Kryo?
经过更多的挖掘后发现确实有一个不同的序列化程序用于闭包。由于 Kryo 的错误,闭包序列化程序被硬编码为默认序列化程序。
这个答案很好地解释了它:
不过,我能够使用广播解决我的特殊问题。
这是我的代码现在的样子:
JavaSparkContext sparkContext; // provided
JavaPairRDD<String, IonValue> rdd; // provided
IonSexp filterExpression; // provided
Broadcast<IonSexp> filterExprBroadcast = sparkContext.broadcast(filterExpression);
rdd = rdd.filter(record -> myCustomFilter(filterExprBroadcast.value(), record));
filterExprBroadcast.destroy(false); // Only do this after an action is executed
广播的处理方式与 RDD 类似,因此它确实使用了已配置的 Kryo 序列化程序。