BQ 切换到 TIMESTAMP 分区 Table

BQ Switching to TIMESTAMP Partitioned Table

我正在尝试将 IngestionTime (_PARTITIONTIME) 迁移到 BQ 中的 TIMESTAMP 分区 tables。为此,我还需要添加几个必需的列。但是,当我翻转开关并将我的数据流重定向到新的 TIMESTAMP 分区 table 时,它会中断。注意事项:

几天来我一直在调查这个问题,并试图将过渡分解为尽可能小的步骤。似乎导致错误的步骤是引入 REQUIRED 变量(当相同变量为 NULLABLE 时它工作正常)。为了避免任何可能的解析错误,我为所有必需的变量设置了默认值。

目前,我遇到了以下错误组合,但我不确定如何解决其中的任何错误:

第一个错误,不经常重复但通常成群出现:

Profiling Agent not found. Profiles will not be available from this worker

大量出现且成群结队:

Can't verify serialized elements of type BoundedSource have well defined equals method. This may produce incorrect results on some PipelineRunner

似乎是其中的一大群:

Aborting Operations. java.lang.RuntimeException: Unable to read value from state

接近尾声时,此错误每 5 分钟出现一次,仅被如下所述的轻微解析错误所包围。

Processing stuck in step BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/ParMultiDo(WriteTables) for at least 20m00s without outputting or completing in state finish

由于我的项目解析的数据量巨大,存在一些解析错误,例如意外字符。它们很少见,但不应中断数据插入。如果他们这样做,我有一个更大的问题,因为我收集的数据经常变化,我只能在看到错误后调整解析器,因此,看到新的数据格式。此外,这不会导致摄取时间 table 中断(或我的其他时间戳分区 table 中断)。也就是说,这是一个解析错误的例子:

Error: Unexpected character (',' (code 44)): was expecting double-quote to start field name

编辑: 一些相关的示例代码:

public PipelineResult streamData() {
        try {
            GenericSection generic = new GenericSection(options.getBQProject(), options.getBQDataset(), options.getBQTable());
            Pipeline pipeline = Pipeline.create(options);

            pipeline.apply("Read PubSub Events", PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
                                              .apply(options.getWindowDuration() + " Windowing",  generic.getWindowDuration(options.getWindowDuration()))
                                              .apply(generic.getPubsubToString())
                                              .apply(ParDo.of(new CrowdStrikeFunctions.RowBuilder()))
                                              .apply(new BigQueryBuilder().setBQDest(generic.getBQDest())
                                                                          .setStreaming(options.getStreamingUpload())
                                                                          .setTriggeringFrequency(options.getTriggeringFrequency())

                                                                          .build());

            return pipeline.run();
        } 
        catch (Exception e) {
            LOG.error(e.getMessage(), e);
            return null;
        }

正在写入 BQ。我确实尝试直接在这里设置分区字段,但它似乎没有任何影响:

BigQueryIO.writeTableRows()
                .to(BQDest)
                .withMethod(Method.FILE_LOADS)
                .withNumFileShards(1000)
                .withTriggeringFrequency(this.triggeringFrequency)
                .withTimePartitioning(new TimePartitioning().setType("DAY"))
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER);
    }

经过大量挖掘,我发现了错误。我有一个解析逻辑 (a try/catch),在出现解析错误的情况下不返回任何内容(本质上是一个空行)。这会破坏 BigQuery,因为我的模式有几个必需的行。

由于我的作业 运行 是批处理的,所以即使是一个空行也会导致整个批处理作业失败并且不会插入任何内容。这也解释了为什么流式插入就好了。令我惊讶的是,BigQuery 没有抛出一个错误,声称我试图将 null 插入必填字段。

在得出这个结论时,我还意识到在我的代码中设置分区字段也是必要的,而不仅仅是在架构中。可以使用

来完成
.setField(partitionField)