Apache Beam 批处理应用程序 - 计时器回调未执行

Apache Beam batch application - timer callback not executing

我目前正在使用 java api 构建光束批处理应用程序。 我的源数据集是有界的,可能包含也可能不包含时间戳。 我的应用程序基本上是从 bigquery tables 采购的,流程如下:

1 - 从 BQ

读取源代码 table

2 - 将数据分组并准备 google DLP api 调用 - API 每次调用都有限制,因此我使用状态来缓冲数据并触发达到一定大小的缓冲区后立即调用 API。

3 - 对于未达到阈值的剩余缓冲数据,应执行定时器回调以清除剩余数据。

然而,定时器回调没有被执行。下面是我的示例代码

@TimerId("stale")
private final TimerSpec staleSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

@StateId("buffer")
private final StateSpec<BagState<KV<Integer, TableRow>>> bufferedEvents = StateSpecs.bag();

@StateId("count")
private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();


@ProcessElement
public void process(ProcessContext context,
                    @StateId("buffer") BagState<KV<Integer, TableRow>> bufferState,
                    @StateId("count") ValueState<Integer> countState,
                    @TimerId("stale") Timer staleTimer) throws RuntimeException {

        if (firstNonNull(countState.read(), 0) == 0 ){
            staleTimer.offset(MAX_BUFFER_DURATION).setRelative();
        }

        int count = firstNonNull(countState.read(), 0);
        count = count + 1;
        countState.write(count);
        bufferState.add(context.element());
        int currentContentItemCount = count  * this.dlpTableHeader.get().size();

        if (currentContentItemCount >= (DLP_MAX_ITEM_PER_CALL - INTERNAL_BUFFER)){
               // execute call to cloud DLP
               // clear buffer and count etc
        }
}

@OnTimer("stale")
public void onStale(OnTimerContext context,
                    @StateId("buffer") BagState<KV<Integer, TableRow>> bufferState,
                    @StateId("count") ValueState<Integer> countState) throws IOException {
     if (Boolean.FALSE.equals(bufferState.isEmpty().read())){
      // call DLP api
      // clear buffer, count etc
     }

管道是这样的:

pipeline
.apply("Read BQ source data", BQSourceReader.readSource(sourceSerDeUtil, tableSchema, dlpArgsOptions))
.apply("Generate Fake KV", ParDo.of(new GenerateFakeKV()))
.apply("Inspect Content of DLPTables from buffered TableRows",
                        ParDo.of(new DLPInspectContentTransform(ValueProvider.StaticValueProvider.of(dlpTableHeader))));

我不确定为什么定时器回调没有被执行,我需要引入一个固定的 window 还是在每个元素中放置一个虚拟时间戳?提前谢谢你!

最后,我通过显式定义带触发器的全局 window 解决了这个问题。

Trigger subtrigger = AfterProcessingTime.pastFirstElementInPane();
Trigger maintrigger = Repeatedly.forever(subtrigger);