如何 select Apache Beam 中 Avro SpecificRecordBase 的所有子类的单个编码器?
How to select a single encoder for all subclass of Avro SpecificRecordBase in Apache Beam?
背景
我的 Beam 管道旨在处理 Avro SpecificRecordBase 类型的元素。
为了简化我的问题,假设我有两种以 Avro 格式生成的元素,它们都有自己的字段:
class Dog extends SpecificRecordBase {
....
}
class Cat extends SpecificRecordBase {
...
}
Pipeline将从输入Kafka读取元素,处理元素并将处理后的元素放入输出Kafka,如下所示:
Pipeline pipeline = Pipeline.create(getOptions());
pipeline.getCoderRegistry().registerCoderForClass(SpecificRecordBase.class, <what shall I put here?>);
pipeline.apply(kafkaReaderTransformer)
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(getWindowSize()))))
.apply(GroupByKey.create())
.apply(ParDo.of(GiveShowerToPetDoFn))
.apply(Flatten.iterables())
.apply(kafkaWriterTransformer);
问题
我的问题是如何在我的管道中注册编码器?
由于未来可以从 Cat Kafka 或 Dog Kafka 以及 Toad Kafka 中读取流水线,因此我需要一种通用方法来注册编码器,该编码器可以序列化在 运行 时间决定的 SpecificRecordBase 的所有子类。
我失败的解决方案
我尝试了以下方法来填补代码中的空白:
AvroCoder.of(SpecificRecordBase.class):不工作
我在 运行 连接管道时出现以下错误:
Caused by: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroRuntimeException: Not a Specific class: class org.apache.avro.specific.SpecificRecordBase
at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2234)
at avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965)
at avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969)
at avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829)
at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225)
... 23 more
Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class: class org.apache.avro.specific.SpecificRecordBase
at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285)
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
at org.apache.avro.specific.SpecificData.load(SpecificData.java:218)
at org.apache.avro.specific.SpecificData.load(SpecificData.java:215)
at avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
at avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
at avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
... 27 more
SerializableCoder.of(SpecificRecordBase.class): 抛出令人困惑的异常
这应该是一个很有前途的选项,但是当我 运行 管道时,我在下面遇到了非常令人困惑的错误,下面是令人困惑的,因为 Cat 实际上是通过从 SpecificRecordBase 继承来实现可序列化的:
Caused by: java.lang.ClassCastException: Cat cannot be cast to java.io.Serializable
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:53)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:578)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:569)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
at org.apache.beam.runners.spark.coders.CoderHelpers.toByteArray(CoderHelpers.java:53)
at org.apache.beam.runners.spark.coders.CoderHelpers.lambda$toByteFunctione77fe8(CoderHelpers.java:143)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun.apply(JavaPairRDD.scala:1043)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun.apply(JavaPairRDD.scala:1043)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
- 编码器不要自己装,让Beam自己推断。
系统会为我推断出合适的编码器。这个什么都不做的解决方案在我的本地独立机器上有效,但是当我将它们放入真正的多服务器环境中时,它们只是抛出异常,表明它们无法推断出编码器。
Caused by: java.lang.IllegalStateException: Unable to return a default Coder for ParDo(Deserialize)/ParMultiDo(Deserialize).output [PCollection]. Correct one of the following root causes:
No Coder has been manually specified; you may do so using .setCoder().
Inferring a Coder from the CoderRegistry failed: Cannot provide coder for parameterized type org.apache.beam.sdk.values.KV<java.lang.String, org.apache.avro.specific.SpecificRecordBase>: Unable to provide a Coder for org.apache.avro.specific.SpecificRecordBase.
Building a Coder using a registered CoderProvider failed.
See suppressed exceptions for detailed failures.
Using the default output Coder from the producing PTransform failed: PTransform.getOutputCoder called.
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:507)
at org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:278)
at org.apache.beam.sdk.values.PCollection.finishSpecifying(PCollection.java:115)
at org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(TransformHierarchy.java:191)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:538)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:473)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:357)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154)
我终于使用解决方法解决了此处发布的问题。
根本原因
原来是不同环境的编码器不兼容导致的。虽然编码器在我的本地环境中工作,但产品依赖版本不同,导致 Beam 库无法编码和解码从 SpecificRecordBase 派生的 class。
两种解法
1) 使用字节作为输入和字节作为输出更改管道中的每个 doFun:
public class GiveShowerToPetDoFn extends DoFn<KV<String, byte[]>, KV<String, byte[]>> {
...
}
这意味着您将在执行实际业务逻辑之前手动从字节反序列化对象,并将结果序列化回字节作为最后一步。这使得 Beam 在 apllied doFun 之间使用默认字节 encoder/decoder 并且 encoder/decoder 将始终有效,因为它处理的是基本类型而不是自定义类型。
2) 为您的自定义类型编写您自己的 encoder/decoder。
方案一和方案二本质上是一样的。就我而言,我使用第一个解决方案来解决我的问题。
背景
我的 Beam 管道旨在处理 Avro SpecificRecordBase 类型的元素。
为了简化我的问题,假设我有两种以 Avro 格式生成的元素,它们都有自己的字段:
class Dog extends SpecificRecordBase {
....
}
class Cat extends SpecificRecordBase {
...
}
Pipeline将从输入Kafka读取元素,处理元素并将处理后的元素放入输出Kafka,如下所示:
Pipeline pipeline = Pipeline.create(getOptions());
pipeline.getCoderRegistry().registerCoderForClass(SpecificRecordBase.class, <what shall I put here?>);
pipeline.apply(kafkaReaderTransformer)
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(getWindowSize()))))
.apply(GroupByKey.create())
.apply(ParDo.of(GiveShowerToPetDoFn))
.apply(Flatten.iterables())
.apply(kafkaWriterTransformer);
问题
我的问题是如何在我的管道中注册编码器? 由于未来可以从 Cat Kafka 或 Dog Kafka 以及 Toad Kafka 中读取流水线,因此我需要一种通用方法来注册编码器,该编码器可以序列化在 运行 时间决定的 SpecificRecordBase 的所有子类。
我失败的解决方案
我尝试了以下方法来填补代码中的空白:
AvroCoder.of(SpecificRecordBase.class):不工作
我在 运行 连接管道时出现以下错误:
Caused by: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroRuntimeException: Not a Specific class: class org.apache.avro.specific.SpecificRecordBase
at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2234)
at avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965)
at avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969)
at avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829)
at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225)
... 23 more
Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class: class org.apache.avro.specific.SpecificRecordBase
at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285)
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
at org.apache.avro.specific.SpecificData.load(SpecificData.java:218)
at org.apache.avro.specific.SpecificData.load(SpecificData.java:215)
at avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
at avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
at avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
... 27 more
SerializableCoder.of(SpecificRecordBase.class): 抛出令人困惑的异常
这应该是一个很有前途的选项,但是当我 运行 管道时,我在下面遇到了非常令人困惑的错误,下面是令人困惑的,因为 Cat 实际上是通过从 SpecificRecordBase 继承来实现可序列化的:
Caused by: java.lang.ClassCastException: Cat cannot be cast to java.io.Serializable
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:53)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:578)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:569)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
at org.apache.beam.runners.spark.coders.CoderHelpers.toByteArray(CoderHelpers.java:53)
at org.apache.beam.runners.spark.coders.CoderHelpers.lambda$toByteFunctione77fe8(CoderHelpers.java:143)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun.apply(JavaPairRDD.scala:1043)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun.apply(JavaPairRDD.scala:1043)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
- 编码器不要自己装,让Beam自己推断。 系统会为我推断出合适的编码器。这个什么都不做的解决方案在我的本地独立机器上有效,但是当我将它们放入真正的多服务器环境中时,它们只是抛出异常,表明它们无法推断出编码器。
Caused by: java.lang.IllegalStateException: Unable to return a default Coder for ParDo(Deserialize)/ParMultiDo(Deserialize).output [PCollection]. Correct one of the following root causes:
No Coder has been manually specified; you may do so using .setCoder().
Inferring a Coder from the CoderRegistry failed: Cannot provide coder for parameterized type org.apache.beam.sdk.values.KV<java.lang.String, org.apache.avro.specific.SpecificRecordBase>: Unable to provide a Coder for org.apache.avro.specific.SpecificRecordBase.
Building a Coder using a registered CoderProvider failed.
See suppressed exceptions for detailed failures.
Using the default output Coder from the producing PTransform failed: PTransform.getOutputCoder called.
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:507)
at org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:278)
at org.apache.beam.sdk.values.PCollection.finishSpecifying(PCollection.java:115)
at org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(TransformHierarchy.java:191)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:538)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:473)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:357)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154)
我终于使用解决方法解决了此处发布的问题。
根本原因
原来是不同环境的编码器不兼容导致的。虽然编码器在我的本地环境中工作,但产品依赖版本不同,导致 Beam 库无法编码和解码从 SpecificRecordBase 派生的 class。
两种解法
1) 使用字节作为输入和字节作为输出更改管道中的每个 doFun:
public class GiveShowerToPetDoFn extends DoFn<KV<String, byte[]>, KV<String, byte[]>> {
...
}
这意味着您将在执行实际业务逻辑之前手动从字节反序列化对象,并将结果序列化回字节作为最后一步。这使得 Beam 在 apllied doFun 之间使用默认字节 encoder/decoder 并且 encoder/decoder 将始终有效,因为它处理的是基本类型而不是自定义类型。
2) 为您的自定义类型编写您自己的 encoder/decoder。
方案一和方案二本质上是一样的。就我而言,我使用第一个解决方案来解决我的问题。