计算流管道中的不同值
Count distinct values in a stream pipeline
我的管道看起来像
pipeline.apply(PubsubIO.read.subscription("some subscription"))
.apply(Window.into(SlidingWindow.of(10 mins).every(20 seconds)
.triggering(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(20 seconds))
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes()))
.apply(RemoveDuplicates.create())
.apply(Window.discardingFiredPanes()) // this is suggested in the warnings under https://cloud.google.com/dataflow/model/triggers#window-accumulation-modes
.apply(Count.<String>globally().withoutDefaults())
此管道显着高估了不同的值(20 倍正常值)。最初,我怀疑默认触发器可能导致了这个问题。我已经调整使用不允许 lateness/discard 触发 panes/use 处理时间的触发器,所有这些都具有类似的过度计数问题。
我也试过 ApproximateUnique.globally
:它在管道构造期间失败,因为看起来像这样的异常
Default values are not supported in Combine.globally() if the output PCollection is not windowed by GlobalWindows.
似乎无法向其中添加 withoutDefaults
(就像我们对 Count.globally
所做的那样)。
是否有推荐的方法在 dataflow/beam 流式传输管道中以合理的精度执行 COUNT(DISTINCT)
?
P.S。我正在使用 Java Dataflow SDK 1.9.0.
你的代码看起来没问题;它不应该多算。请注意,您将每个元素放入 30 windows,因此,如果您有一个 window-unaware 接收器(相当于折叠所有滑动 windows),您将期望正好是 30 倍的元素。如果您可以展示更多的管道或您如何观察计数,那可能会有所帮助。
除此之外,我对管道有一些建议:
- 我建议将
RemoveDuplicates
的触发器更改为 AfterPane.elementCountAtLeast(1)
;这将以较低的延迟为您提供相同的结果,因为稍后到达的元素不会产生任何影响。此触发器和您当前的触发器永远不会重复触发。所以设置 accumulatingFiredPanes()
或 discardingFiredPanes()
实际上并不重要。这很好,因为没有一个会与您的管道的其余部分一起工作。
- 我会在
Count
之前安装一个新触发器。原因有点技术性,但我会尝试描述它:
- 在您当前的管道中,安装在那里的触发器(
RemoveDuplicates
触发器的 "continuation trigger")记录第一个元素的到达时间并等待它收到所有生成的元素在 处理时间 或之前,由上游工作人员测量。存在一些不确定性,因为它双关本地处理时间和其他工作人员的处理时间。
- 如果您采纳我的建议并将触发器切换为
RemoveDuplicates
,那么继续触发器将为 AfterPane.elementCountAtLeast(1)
,因此它将始终尽快发出计数,然后丢弃更多数据,这大错特错。
我的管道看起来像
pipeline.apply(PubsubIO.read.subscription("some subscription"))
.apply(Window.into(SlidingWindow.of(10 mins).every(20 seconds)
.triggering(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(20 seconds))
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes()))
.apply(RemoveDuplicates.create())
.apply(Window.discardingFiredPanes()) // this is suggested in the warnings under https://cloud.google.com/dataflow/model/triggers#window-accumulation-modes
.apply(Count.<String>globally().withoutDefaults())
此管道显着高估了不同的值(20 倍正常值)。最初,我怀疑默认触发器可能导致了这个问题。我已经调整使用不允许 lateness/discard 触发 panes/use 处理时间的触发器,所有这些都具有类似的过度计数问题。
我也试过 ApproximateUnique.globally
:它在管道构造期间失败,因为看起来像这样的异常
Default values are not supported in Combine.globally() if the output PCollection is not windowed by GlobalWindows.
似乎无法向其中添加 withoutDefaults
(就像我们对 Count.globally
所做的那样)。
是否有推荐的方法在 dataflow/beam 流式传输管道中以合理的精度执行 COUNT(DISTINCT)
?
P.S。我正在使用 Java Dataflow SDK 1.9.0.
你的代码看起来没问题;它不应该多算。请注意,您将每个元素放入 30 windows,因此,如果您有一个 window-unaware 接收器(相当于折叠所有滑动 windows),您将期望正好是 30 倍的元素。如果您可以展示更多的管道或您如何观察计数,那可能会有所帮助。
除此之外,我对管道有一些建议:
- 我建议将
RemoveDuplicates
的触发器更改为AfterPane.elementCountAtLeast(1)
;这将以较低的延迟为您提供相同的结果,因为稍后到达的元素不会产生任何影响。此触发器和您当前的触发器永远不会重复触发。所以设置accumulatingFiredPanes()
或discardingFiredPanes()
实际上并不重要。这很好,因为没有一个会与您的管道的其余部分一起工作。 - 我会在
Count
之前安装一个新触发器。原因有点技术性,但我会尝试描述它:- 在您当前的管道中,安装在那里的触发器(
RemoveDuplicates
触发器的 "continuation trigger")记录第一个元素的到达时间并等待它收到所有生成的元素在 处理时间 或之前,由上游工作人员测量。存在一些不确定性,因为它双关本地处理时间和其他工作人员的处理时间。 - 如果您采纳我的建议并将触发器切换为
RemoveDuplicates
,那么继续触发器将为AfterPane.elementCountAtLeast(1)
,因此它将始终尽快发出计数,然后丢弃更多数据,这大错特错。
- 在您当前的管道中,安装在那里的触发器(