计算流管道中的不同值

Count distinct values in a stream pipeline

我的管道看起来像

pipeline.apply(PubsubIO.read.subscription("some subscription"))
            .apply(Window.into(SlidingWindow.of(10 mins).every(20 seconds)
                            .triggering(AfterProcessingTime.pastFirstElementInPane()
                    .plusDelayOf(20 seconds))
                    .withAllowedLateness(Duration.ZERO)
                    .accumulatingFiredPanes()))
            .apply(RemoveDuplicates.create())
            .apply(Window.discardingFiredPanes()) // this is suggested in the warnings under https://cloud.google.com/dataflow/model/triggers#window-accumulation-modes
            .apply(Count.<String>globally().withoutDefaults())

此管道显着高估了不同的值(20 倍正常值)。最初,我怀疑默认触发器可能导致了这个问题。我已经调整使用不允许 lateness/discard 触发 panes/use 处理时间的触发器,所有这些都具有类似的过度计数问题。

我也试过 ApproximateUnique.globally:它在管道构造期间失败,因为看起来像这样的异常 Default values are not supported in Combine.globally() if the output PCollection is not windowed by GlobalWindows. 似乎无法向其中添加 withoutDefaults(就像我们对 Count.globally 所做的那样)。

是否有推荐的方法在 dataflow/beam 流式传输管道中以合理的精度执行 COUNT(DISTINCT)

P.S。我正在使用 Java Dataflow SDK 1.9.0.

你的代码看起来没问题;它不应该多算。请注意,您将每个元素放入 30 windows,因此,如果您有一个 window-unaware 接收器(相当于折叠所有滑动 windows),您将期望正好是 30 倍的元素。如果您可以展示更多的管道或您如何观察计数,那可能会有所帮助。

除此之外,我对管道有一些建议:

  • 我建议将 RemoveDuplicates 的触发器更改为 AfterPane.elementCountAtLeast(1);这将以较低的延迟为您提供相同的结果,因为稍后到达的元素不会产生任何影响。此触发器和您当前的触发器永远不会重复触发。所以设置 accumulatingFiredPanes()discardingFiredPanes() 实际上并不重要。这很好,因为没有一个会与您的管道的其余部分一起工作。
  • 我会在 Count 之前安装一个新触发器。原因有点技术性,但我会尝试描述它:
    • 在您当前的管道中,安装在那里的触发器(RemoveDuplicates 触发器的 "continuation trigger")记录第一个元素的到达时间并等待它收到所有生成的元素在 处理时间 或之前,由上游工作人员测量。存在一些不确定性,因为它双关本地处理时间和其他工作人员的处理时间。
    • 如果您采纳我的建议并将触发器切换为 RemoveDuplicates,那么继续触发器将为 AfterPane.elementCountAtLeast(1),因此它将始终尽快发出计数,然后丢弃更多数据,这大错特错。