如何 "rate-limit" Apache Beam 中的 PCollection?
How to "rate-limit" a PCollection in Apache Beam?
我遇到了一个看似常见的问题,但我无法弄清楚 Beam 推荐的解决方案是什么。
我有一个原始事件流,我正在寻找两个单独的事件来满足滑动 window(60 分钟)内的条件,以便 "trigger" 发出警报。
用 SlidingWindows
做起来很容易,但是问题是由于它的滑动性质,我有效地收到了多个 windows 潜在的警报。我如何最终获得仅输出一次此类警报的 PCollection(在特定 timeframe/cooldown 持续时间内)?
我最初认为最近的状态处理功能将是我的解决方案,但后来意识到它只适用于 window。侧输入也是如此。所以在我看来,我需要一种方法来打破 windows 并在一个(可能的 Session-)window 中处理警报 "firings"。但是文档没有提到任何有效地将元素重新分配给 new windows
的方法
有趣的应用程序!
总结一下:
- 听起来 "sliding window" 对于您的用例来说意味着连续滑动。您可以选择最小粒度,但这不一定是自然的。
- 您感兴趣的每组事件应该只产生一个输出。
有几种方法可以解决这个问题,具体取决于您的应用程序的其余部分。
一种方法是将数据留在全局 window 中并使用状态。您将不得不自己管理迟到 - 丢弃太晚的元素,处理乱序的数据等,并且通常保持您的状态有界。
另一种方法是使用滑动 windows 和 Combine
或状态(基本上,您已经尝试过的),然后重新 window 警报和重复数据删除.您可以为此使用 fixed windows,因为警报应该具有确定性时间戳; window 的末尾将控制何时自动收集状态,这样很方便。
我最终采用了重新windowing 策略,类似于@Kenn 的建议。
所以我从滑动 windowed 集合中收到警报,我将其重新window 到会话 windows
.apply(Window.remerge())
.apply(Window.into(Sessions.withGapDuration(Duration.standardHours(1))))
在那个 windowed 集合中,我可以只做一个 groupBy
,从而获得一个会话的所有 Alerts
,我可以在其中应用我的冷却逻辑,只发出一个每小时提醒一次。
我遇到了一个看似常见的问题,但我无法弄清楚 Beam 推荐的解决方案是什么。
我有一个原始事件流,我正在寻找两个单独的事件来满足滑动 window(60 分钟)内的条件,以便 "trigger" 发出警报。
用 SlidingWindows
做起来很容易,但是问题是由于它的滑动性质,我有效地收到了多个 windows 潜在的警报。我如何最终获得仅输出一次此类警报的 PCollection(在特定 timeframe/cooldown 持续时间内)?
我最初认为最近的状态处理功能将是我的解决方案,但后来意识到它只适用于 window。侧输入也是如此。所以在我看来,我需要一种方法来打破 windows 并在一个(可能的 Session-)window 中处理警报 "firings"。但是文档没有提到任何有效地将元素重新分配给 new windows
的方法有趣的应用程序!
总结一下:
- 听起来 "sliding window" 对于您的用例来说意味着连续滑动。您可以选择最小粒度,但这不一定是自然的。
- 您感兴趣的每组事件应该只产生一个输出。
有几种方法可以解决这个问题,具体取决于您的应用程序的其余部分。
一种方法是将数据留在全局 window 中并使用状态。您将不得不自己管理迟到 - 丢弃太晚的元素,处理乱序的数据等,并且通常保持您的状态有界。
另一种方法是使用滑动 windows 和 Combine
或状态(基本上,您已经尝试过的),然后重新 window 警报和重复数据删除.您可以为此使用 fixed windows,因为警报应该具有确定性时间戳; window 的末尾将控制何时自动收集状态,这样很方便。
我最终采用了重新windowing 策略,类似于@Kenn 的建议。
所以我从滑动 windowed 集合中收到警报,我将其重新window 到会话 windows
.apply(Window.remerge())
.apply(Window.into(Sessions.withGapDuration(Duration.standardHours(1))))
在那个 windowed 集合中,我可以只做一个 groupBy
,从而获得一个会话的所有 Alerts
,我可以在其中应用我的冷却逻辑,只发出一个每小时提醒一次。