如何在 Apache Beam 中修复 "Joining unbounded PCollections is currently only supported for non-global windows with triggers"

How to fix "Joining unbounded PCollections is currently only supported for non-global windows with triggers" in Apache Beam

我正在尝试使用 Apache Beam Java SDK 加入 2 个无限源。加入时我收到以下错误消息。

Exception in thread "main" java.lang.UnsupportedOperationException: Joining unbounded PCollections is currently only supported for non-global windows with triggers that are known to produce output once per window,such as the default trigger with zero allowed lateness. In these cases Beam can guarantee it joins all input elements once per window. WindowingStrategy{windowFn=org.apache.beam.sdk.transforms.windowing.SlidingWindows@1b87117, allowedLateness=PT0S, trigger=Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute)), accumulationMode=DISCARDING_FIRED_PANES, timestampCombiner=EARLIEST} is not supported at org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel.verifySupportedTrigger(BeamJoinRel.java:341) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel.access00(BeamJoinRel.java:98) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel$StandardJoin.expand(BeamJoinRel.java:330) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel$StandardJoin.expand(BeamJoinRel.java:308) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:67) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.lambda$buildPCollectionList[=13=](BeamSqlRelUtils.java:48) at java.util.stream.ReferencePipeline.accept(ReferencePipeline.java:193) at java.util.Iterator.forEachRemaining(Iterator.java:116) at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.buildPCollectionList(BeamSqlRelUtils.java:49) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:65) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:36) at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:100) at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:76) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488) at org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:167) at xyz.xyz.main(xyz.java:64)

我尝试同时使用固定和滑动 Window 以及触发(pastEndOfWindow 和 pastFirstElementInPane),允许延迟为零。尝试了 Accumulate 和 Discard 烧制窗格。我每次都收到相同的错误消息。

下面是我尝试使用固定和滑动的 2 个片段 window。

p1.apply("window",
    Window
      .<Row>into(FixedWindows.of(Duration.standardSeconds(50)))
      .triggering(AfterWatermark.pastEndOfWindow())
      .withAllowedLateness(Duration.ZERO)
      .accumulatingFiredPanes());
p1.apply("window2",
    Window.<Row>into(
        SlidingWindows
          .of(Duration.standardSeconds(30))
          .every(Duration.standardSeconds(5)))
      .triggering(
        Repeatedly
          .forever(
             AfterProcessingTime
               .pastFirstElementInPane()
               .plusDelayOf(Duration.standardMinutes(1))))
      .withAllowedLateness(Duration.ZERO)
      .discardingFiredPanes());

我只是想通过滑动 window 实现 sql 转换,延迟触发并允许迟到。请指导我完成它。

谢谢, 高瑟姆

直到现在 (2.13.0),BeamSQL 不支持使用 non-default 触发器的无界连接无界 PCollections。此类连接仅允许使用默认触发器(因此每个 window 只会发出一个结果)。

主要原因是,在当前的 Beam Java SDK 实现中,缺少一种机制(称为收回和累积)来在 Join 等情况下细化数据。

根据评论,如果我理解正确,期望的行为是:

  • 加入两个流;
  • 在现实世界中每 30 秒发出一次结果;
  • 如果匹配不到数据,等待相应的匹配记录最多30分钟;
  • 30 分钟后删除记录;

基本上是对两个流中最后30分钟的数据进行连续滑动匹配,每30秒发出一次结果。

好消息是应该可以在 Beam Java 中实现(也可能在 Python 中实现)。坏消息可能是 non-trivial 在 Java 中,我认为目前在 SQL 中根本不可能。

它可能会是什么样子:

  • 输入应该是全局的 window;
  • have a stateful ParDo (or this) 通过将所有可见元素存储在状态单元中来跟踪所有可见元素:
    • 您可能需要事先使用 side-input or apply a CoGroupByKey 才能访问同一 ParDo 中两个输入的元素;
    • side-inputs 和 CoGroupByKey 具有不同的语义,可能不容易使用;
  • 在每个输入上手动检查匹配记录的状态;
  • 要么立即发出结果,要么将它们保存在另一个状态单元格中;
  • a timer 可以清除旧的不匹配记录:
    • 您可能需要手动跟踪时间戳和其他内容;
  • 如果需要,将所需 window/trigger 应用于输出;

我建议您通读 this example, it does the timer and state part of what you need (it waits for matching records, keeps the unmatched records in the state, and clears state on timer firing) and uses a CoGroupByKey。理解这个例子后,您可能会更好地了解它是如何工作的。