使用 Apache Beam 写入通用记录时出现 Avro "not open" 异常

Avro "not open" exception when writing generic records using Apache Beam

我正在使用 AvroIO.<MyCustomType>writeCustomTypeToGenericRecords() 在流式数据流作业中将通用记录写入 GCS。开始的几分钟似乎一切正常,但是,大约 10 分钟后,作业开始抛出以下错误:

java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: org.apache.avro.AvroRuntimeException: not open
        com.google.cloud.dataflow.worker.GroupAlsoByWindowsParDoFn.output(GroupAlsoByWindowsParDoFn.java:183)
        com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102)
        org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger(ReduceFnRunner.java:1057)
        org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
        org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
        org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
        org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:768)
        com.google.cloud.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:95)
        com.google.cloud.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
        com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
        com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
        org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
        com.google.cloud.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:133)
        com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:43)
        com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:200)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
        com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access00(StreamingDataflowWorker.java:136)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.run(StreamingDataflowWorker.java:966)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.beam.sdk.util.UserCodeException: org.apache.avro.AvroRuntimeException: not open
        org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
        org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown Source)
        org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
        org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:237)
        com.google.cloud.dataflow.worker.StreamingSideInputDoFnRunner.processElement(StreamingSideInputDoFnRunner.java:72)
        com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:324)
        com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:43)
        com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
        com.google.cloud.dataflow.worker.GroupAlsoByWindowsParDoFn.output(GroupAlsoByWindowsParDoFn.java:181)
        com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102)
        org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger(ReduceFnRunner.java:1057)
        org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
        org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
        org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
        org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:768)
        com.google.cloud.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:95)
        com.google.cloud.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
        com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
        com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
        org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
        com.google.cloud.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:133)
        com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:43)
        com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:200)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
        com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access00(StreamingDataflowWorker.java:136)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.run(StreamingDataflowWorker.java:966)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.avro.AvroRuntimeException: not open
        org.apache.avro.file.DataFileWriter.assertOpen(DataFileWriter.java:82)
        org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:299)
        org.apache.beam.sdk.io.AvroSink$AvroWriter.write(AvroSink.java:123)
        org.apache.beam.sdk.io.WriteFiles.writeOrClose(WriteFiles.java:550)
        org.apache.beam.sdk.io.WriteFiles.access00(WriteFiles.java:112)
        org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:718)

数据流作业继续 运行 正常。只是为了提供一些关于流作业的背景:它从 Pub/Sub 中提取消息,创建一个固定的 window 5 分钟,触发 10,000 条消息(以先到者为准),处理消息并最终写入一个 GCP 存储桶,其中每种特定类型的消息都会根据使用 .to(new AvroEventDynamicDestinations(avroBaseDir, schemaView)) 的消息类型转到特定文件夹。

更新 1: 查看此错误的时间戳,它似乎正好以 10 秒的间隔出现,所以每分钟 6 个。

我有完全相同的异常。我的问题来自错误的架构,确切地说是空架构(架构注册表未找到)