任务不可序列化异常 - 在 Spark foreach 中使用 JMSTemplate 时
Task Not Serializable Exception - When using JMSTemplate in Spark foreach
我正在尝试在 rdd.foreach 方法中使用 Spring JMSTemplate class,但出现任务不可序列化错误。
当我尝试使用静态变量时,它在本地但在集群中工作,我得到空指针异常。
示例代码:
inputRDD.foreach(record -> {
messageServices.send(record);
}
错误日志:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1891)
at org.apache.spark.rdd.RDD$$anonfun$foreach.apply(RDD.scala:869)
at org.apache.spark.rdd.RDD$$anonfun$foreach.apply(RDD.scala:868)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:868)
at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:327)
at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:47)
at com.messenger.MessengerDriver.runJob(MessengerDriver.java:108)
at com.messenger.MessengerDriver.main(MessengerDriver.java:60)
Caused by: java.io.NotSerializableException: org.springframework.jms.core.JmsTemplate
Serialization stack:
- object not serializable (class: org.springframework.jms.core.JmsTemplate, value: org.springframework.jms.core.JmsTemplate@3b98b809)
- field (class: com.messenger.Messenger.activemq.MessageProducer, name: jmsTemplate, type: class org.springframework.jms.core.JmsTemplate)
- object (class com.messenger.Messenger.activemq.MessageProducer, com.messenger.Messenger.activemq.MessageProducer@662e682a)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- field (class: org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach, name: f, type: interface org.apache.spark.api.java.function.VoidFunction)
- object (class org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
... 13 more
有没有人遇到过同样的问题?
正确的模式是使用重新分区和 mapPartitions。
repartition
是将RDD映射到合适大小的分区;
mapPartitions
是对每个分区单独处理,可以在传递函数里面为每个分区创建JMSTemplate。
我正在尝试在 rdd.foreach 方法中使用 Spring JMSTemplate class,但出现任务不可序列化错误。 当我尝试使用静态变量时,它在本地但在集群中工作,我得到空指针异常。
示例代码:
inputRDD.foreach(record -> {
messageServices.send(record);
}
错误日志:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1891)
at org.apache.spark.rdd.RDD$$anonfun$foreach.apply(RDD.scala:869)
at org.apache.spark.rdd.RDD$$anonfun$foreach.apply(RDD.scala:868)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:868)
at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:327)
at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:47)
at com.messenger.MessengerDriver.runJob(MessengerDriver.java:108)
at com.messenger.MessengerDriver.main(MessengerDriver.java:60)
Caused by: java.io.NotSerializableException: org.springframework.jms.core.JmsTemplate
Serialization stack:
- object not serializable (class: org.springframework.jms.core.JmsTemplate, value: org.springframework.jms.core.JmsTemplate@3b98b809)
- field (class: com.messenger.Messenger.activemq.MessageProducer, name: jmsTemplate, type: class org.springframework.jms.core.JmsTemplate)
- object (class com.messenger.Messenger.activemq.MessageProducer, com.messenger.Messenger.activemq.MessageProducer@662e682a)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- field (class: org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach, name: f, type: interface org.apache.spark.api.java.function.VoidFunction)
- object (class org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
... 13 more
有没有人遇到过同样的问题?
正确的模式是使用重新分区和 mapPartitions。
repartition
是将RDD映射到合适大小的分区;
mapPartitions
是对每个分区单独处理,可以在传递函数里面为每个分区创建JMSTemplate。