Apache Beam java.lang.IllegalArgumentException:无效的 lambda 反序列化
Apache Beam java.lang.IllegalArgumentException: Invalid lambda deserialization
我正在使用 Apache Beam 并将数据写入 BigQuery。我的流水线在 intellij 内部使用 Direct runner 在本地运行良好,我可以在 intellij 本地写入 BigQuery table。
但是,我在 Spark Cluster 上部署代码后立即收到异常“java.lang.IllegalArgumentException:无效的 lambda 反序列化”。
User class threw exception: java.lang.IllegalArgumentException: unable to deserialize Custom DoFn With Execution Info
at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:78)
at org.apache.beam.runners.core.construction.ParDoTranslation.doFnWithExecutionInformationFromProto(ParDoTranslation.java:712)
at org.apache.beam.runners.core.construction.ParDoTranslation.getSchemaInformation(ParDoTranslation.java:392)
at org.apache.beam.runners.core.construction.ParDoTranslation.getSchemaInformation(ParDoTranslation.java:377)
at org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.evaluate(StreamingTransformTranslator.java:432)
at org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.evaluate(StreamingTransformTranslator.java:409)
at org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:449)
at org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:438)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:593)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access0(TransformHierarchy.java:240)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:214)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:469)
at org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call(SparkRunnerStreamingContextFactory.java:88)
at org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call(SparkRunnerStreamingContextFactory.java:46)
at org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun.apply(JavaStreamingContext.scala:627)
at org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun.apply(JavaStreamingContext.scala:626)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:848)
at org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(JavaStreamingContext.scala:626)
at org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate(JavaStreamingContext.scala)
at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:180)
at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:96)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
at com.somecompany.beam.BeamApplication.run(BeamApplication.java:43)
at com.somecompany.SparkApp.main(L1LoaderSparkApp.java:19)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon.run(ApplicationMaster.scala:685)
Caused by: java.io.IOException: unexpected exception type
at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736)
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:75)
... 36 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260)
... 52 more
Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
at org.apache.beam.sdk.io.gcp.bigquery.ErrorContainer.$deserializeLambda$(ErrorContainer.java:33)
... 62 more
我的作者class长这样
public class MyTermsBigQueryWriter implements Serializable {
@Qualifier("bigQueryProperties")
private BigQueryProperties bigQueryProperties;
public BigQueryIO.Write<MyTerms> myTermsWriter() {
final BigQueryIO.Write<MyTerms> myTermsWrite = BigQueryIO.<MyTerms>write()
.withMethod(STREAMING_INSERTS)
.withExtendedErrorInfo()
.withFailedInsertRetryPolicy(InsertRetryPolicy.neverRetry())
.withJsonSchema(getMyTermsSchemaFile())
.withFormatFunction(new SerializableFunction<MyTerms, TableRow>() {
@Override
public TableRow apply(MyTerms kt) {
return MyTermsBigQueryWriter.this.getTableRowForMyTermsTable(kt);
}
})
.to(getTableSpec())
.withFormatRecordOnFailureFunction(new SerializableFunction<MyTerms, TableRow>() {
@Override
public TableRow apply(MyTerms myTerms) {
return MyTermsBigQueryWriter.this.getTableRowForMyTermsTable(myTerms);
}
})
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
return myTermsWrite;
}
}
public class MyTerms implements Serializable {
public String xx;
public String xy;
// Has Equal and HashCode methods
}
如果我不使用我的 writer,它在集群上运行良好。
我尝试用方法引用替换 lambda,匿名内部 class 但没有成功。
有什么想法吗?
事实证明,org.apache.avro.generic.GenericData.Record
class 导致了序列化问题。我们删除它以使用简单的 pojo,一切开始正常工作。
我正在使用 Apache Beam 并将数据写入 BigQuery。我的流水线在 intellij 内部使用 Direct runner 在本地运行良好,我可以在 intellij 本地写入 BigQuery table。 但是,我在 Spark Cluster 上部署代码后立即收到异常“java.lang.IllegalArgumentException:无效的 lambda 反序列化”。
User class threw exception: java.lang.IllegalArgumentException: unable to deserialize Custom DoFn With Execution Info
at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:78)
at org.apache.beam.runners.core.construction.ParDoTranslation.doFnWithExecutionInformationFromProto(ParDoTranslation.java:712)
at org.apache.beam.runners.core.construction.ParDoTranslation.getSchemaInformation(ParDoTranslation.java:392)
at org.apache.beam.runners.core.construction.ParDoTranslation.getSchemaInformation(ParDoTranslation.java:377)
at org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.evaluate(StreamingTransformTranslator.java:432)
at org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.evaluate(StreamingTransformTranslator.java:409)
at org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:449)
at org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:438)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:593)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access0(TransformHierarchy.java:240)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:214)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:469)
at org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call(SparkRunnerStreamingContextFactory.java:88)
at org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call(SparkRunnerStreamingContextFactory.java:46)
at org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun.apply(JavaStreamingContext.scala:627)
at org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun.apply(JavaStreamingContext.scala:626)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:848)
at org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(JavaStreamingContext.scala:626)
at org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate(JavaStreamingContext.scala)
at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:180)
at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:96)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
at com.somecompany.beam.BeamApplication.run(BeamApplication.java:43)
at com.somecompany.SparkApp.main(L1LoaderSparkApp.java:19)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon.run(ApplicationMaster.scala:685)
Caused by: java.io.IOException: unexpected exception type
at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736)
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:75)
... 36 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260)
... 52 more
Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
at org.apache.beam.sdk.io.gcp.bigquery.ErrorContainer.$deserializeLambda$(ErrorContainer.java:33)
... 62 more
我的作者class长这样
public class MyTermsBigQueryWriter implements Serializable {
@Qualifier("bigQueryProperties")
private BigQueryProperties bigQueryProperties;
public BigQueryIO.Write<MyTerms> myTermsWriter() {
final BigQueryIO.Write<MyTerms> myTermsWrite = BigQueryIO.<MyTerms>write()
.withMethod(STREAMING_INSERTS)
.withExtendedErrorInfo()
.withFailedInsertRetryPolicy(InsertRetryPolicy.neverRetry())
.withJsonSchema(getMyTermsSchemaFile())
.withFormatFunction(new SerializableFunction<MyTerms, TableRow>() {
@Override
public TableRow apply(MyTerms kt) {
return MyTermsBigQueryWriter.this.getTableRowForMyTermsTable(kt);
}
})
.to(getTableSpec())
.withFormatRecordOnFailureFunction(new SerializableFunction<MyTerms, TableRow>() {
@Override
public TableRow apply(MyTerms myTerms) {
return MyTermsBigQueryWriter.this.getTableRowForMyTermsTable(myTerms);
}
})
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
return myTermsWrite;
}
}
public class MyTerms implements Serializable {
public String xx;
public String xy;
// Has Equal and HashCode methods
}
如果我不使用我的 writer,它在集群上运行良好。
我尝试用方法引用替换 lambda,匿名内部 class 但没有成功。
有什么想法吗?
事实证明,org.apache.avro.generic.GenericData.Record
class 导致了序列化问题。我们删除它以使用简单的 pojo,一切开始正常工作。