在数据流中完成 BQ 写入后的 Apache Beam 写入状态信息
Apache Beam writing status information after BQ writes are done within the dataflow
我正在苦苦寻找BQ写完后写状态的好的解决方案
每个数据流要处理一个文件,没有错误发生后,应该将状态写入Firestore。
我的代码如下所示:
PCollection<TableRow> failedInserts = results.getFailedInserts();
failedInserts
.apply("Set Global Window",
Window.<TableRow>into(new GlobalWindows()))
.apply("Count failures", Count.globally()).apply(ParDo.of(new DoFn<Long, ReportStatusInfo>() {
@ProcessElement
public void processElement(final ProcessContext c) throws IOException {
Long errorNumbers = c.element();
if (errorNumbers > 1) {
//set status to failed
} else if (numberOfErrors == 0) {
//set status to ok
}
insert();
}
}))
它似乎无法正常工作,因为我的印象是它不会等待整个 BQ 写入过程完成。
关于如何解决我在数据流中的问题或为什么上述方法不起作用的任何其他想法?
仅在使用流式插入时才支持 getFailedInserts 方法,而不是文件加载。在那种模式下,你的代码会做你想做的事
我正在苦苦寻找BQ写完后写状态的好的解决方案
每个数据流要处理一个文件,没有错误发生后,应该将状态写入Firestore。
我的代码如下所示:
PCollection<TableRow> failedInserts = results.getFailedInserts();
failedInserts
.apply("Set Global Window",
Window.<TableRow>into(new GlobalWindows()))
.apply("Count failures", Count.globally()).apply(ParDo.of(new DoFn<Long, ReportStatusInfo>() {
@ProcessElement
public void processElement(final ProcessContext c) throws IOException {
Long errorNumbers = c.element();
if (errorNumbers > 1) {
//set status to failed
} else if (numberOfErrors == 0) {
//set status to ok
}
insert();
}
}))
它似乎无法正常工作,因为我的印象是它不会等待整个 BQ 写入过程完成。
关于如何解决我在数据流中的问题或为什么上述方法不起作用的任何其他想法?
仅在使用流式插入时才支持 getFailedInserts 方法,而不是文件加载。在那种模式下,你的代码会做你想做的事