带有 GlobalWindow 的 Beam 中的状态垃圾收集
State garbage collection in Beam with GlobalWindow
Apache Beam 最近通过 StateSpec
和 @StateId
注释引入了 state cells,部分支持 Apache Flink 和 Google Cloud Dataflow。
我找不到任何关于将其与 GlobalWindow
一起使用时会发生什么的文档。特别是,有没有一种方法可以让 "state garbage collection" 机制根据某些配置摆脱一段时间未见的键的状态,同时仍然保持可见的键的单一历史状态够频繁吗?
或者,在这种情况下使用的状态量是否会发生分歧,无法回收与一段时间未见的键对应的状态?
我还对 Apache Flink 或 Google Cloud Dataflow 是否支持潜在解决方案感兴趣。
Flink 和 direct runners 似乎有一些 "state GC" 的代码,但我不太确定它的作用以及它在使用全局 window.
时是否相关
如果您使用 GlobalWindows
,则不会自动收集状态垃圾。仅当您使用一些非全局 window 时,水印通过 window 的末尾加上允许的迟到后,状态才会被垃圾收集。
如果必须使用 GlobalWindows
,您可以做的是手动保持 last update timestamp
的状态。然后你会定期设置一个计时器,你可以根据当前时间检查这个时间戳,并在必要时删除状态。您将在第一次遇到密钥时设置此计时器(您可以从缺少时间戳状态中看出),然后在 @OnTimer
方法中重新设置它。
状态可以在 window 过期后的某个时刻由 Beam runner 自动进行垃圾收集 - 当输入水印超过 window 的末尾允许的延迟时,因此所有进一步的输入是可丢弃的。具体细节取决于跑步者。
正如您正确判断的那样,全局 window 可能永远不会过期。那么这个状态的自动收集就不会被调用。对于有界数据,包括 drain 场景,它实际上会过期,但对于永久无界数据源,它不会。
如果您在全局 window 中对此类数据进行有状态处理,您可以使用用户定义的计时器(通过 @TimerId
、@OnTimer
和 TimerSpec
- 我还没有写过关于这些的博客)以在您选择的超时后清除状态。如果状态代表某种聚合,那么您无论如何都需要一个计时器来确保您的数据不会滞留在状态中。
下面是一个简单的使用示例:
new DoFn<Foo, Baz>() {
private static final String MY_TIMER = "my-timer";
private static final String MY_STATE = "my-state";
@StateId(MY_STATE)
private final StateSpec<ValueState<Bizzle>> =
StateSpec.value(Bizzle.coder());
@TimerId(MY_TIMER)
private final TimerSpec myTimer =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
@ProcessElement
public void process(
ProcessContext c,
@StateId(MY_STATE) ValueState<Bizzle> bizzleState,
@TimerId(MY_TIMER) Timer myTimer) {
bizzleState.write(...);
myTimer.setForNowPlus(...);
}
@OnTimer(MY_TIMER)
public void onMyTimer(
OnTimerContext context,
@StateId(MY_STATE) ValueState<Bizzle> bizzleState) {
context.output(... bizzleState.read() ...);
bizzleState.clear();
}
}
Apache Beam 最近通过 StateSpec
和 @StateId
注释引入了 state cells,部分支持 Apache Flink 和 Google Cloud Dataflow。
我找不到任何关于将其与 GlobalWindow
一起使用时会发生什么的文档。特别是,有没有一种方法可以让 "state garbage collection" 机制根据某些配置摆脱一段时间未见的键的状态,同时仍然保持可见的键的单一历史状态够频繁吗?
或者,在这种情况下使用的状态量是否会发生分歧,无法回收与一段时间未见的键对应的状态?
我还对 Apache Flink 或 Google Cloud Dataflow 是否支持潜在解决方案感兴趣。
Flink 和 direct runners 似乎有一些 "state GC" 的代码,但我不太确定它的作用以及它在使用全局 window.
时是否相关如果您使用 GlobalWindows
,则不会自动收集状态垃圾。仅当您使用一些非全局 window 时,水印通过 window 的末尾加上允许的迟到后,状态才会被垃圾收集。
如果必须使用 GlobalWindows
,您可以做的是手动保持 last update timestamp
的状态。然后你会定期设置一个计时器,你可以根据当前时间检查这个时间戳,并在必要时删除状态。您将在第一次遇到密钥时设置此计时器(您可以从缺少时间戳状态中看出),然后在 @OnTimer
方法中重新设置它。
状态可以在 window 过期后的某个时刻由 Beam runner 自动进行垃圾收集 - 当输入水印超过 window 的末尾允许的延迟时,因此所有进一步的输入是可丢弃的。具体细节取决于跑步者。
正如您正确判断的那样,全局 window 可能永远不会过期。那么这个状态的自动收集就不会被调用。对于有界数据,包括 drain 场景,它实际上会过期,但对于永久无界数据源,它不会。
如果您在全局 window 中对此类数据进行有状态处理,您可以使用用户定义的计时器(通过 @TimerId
、@OnTimer
和 TimerSpec
- 我还没有写过关于这些的博客)以在您选择的超时后清除状态。如果状态代表某种聚合,那么您无论如何都需要一个计时器来确保您的数据不会滞留在状态中。
下面是一个简单的使用示例:
new DoFn<Foo, Baz>() {
private static final String MY_TIMER = "my-timer";
private static final String MY_STATE = "my-state";
@StateId(MY_STATE)
private final StateSpec<ValueState<Bizzle>> =
StateSpec.value(Bizzle.coder());
@TimerId(MY_TIMER)
private final TimerSpec myTimer =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
@ProcessElement
public void process(
ProcessContext c,
@StateId(MY_STATE) ValueState<Bizzle> bizzleState,
@TimerId(MY_TIMER) Timer myTimer) {
bizzleState.write(...);
myTimer.setForNowPlus(...);
}
@OnTimer(MY_TIMER)
public void onMyTimer(
OnTimerContext context,
@StateId(MY_STATE) ValueState<Bizzle> bizzleState) {
context.output(... bizzleState.read() ...);
bizzleState.clear();
}
}