Apache beam:状态规范中的 TTL
Apache beam: TTL in State Spec
我们正在从 Kinesis 读取并写入 parquet,我们使用 StateSpec<ValueState<Boolean>>
来避免在从最后一个保存点正常停止并重新启动我们的管道后重复处理记录。
我们看到一些记录是重复的,因为它们最终会在后续重新启动时落入不同的任务管理器,我们使用 StateSpec<ValueState<Boolean>>
来存储有关已处理记录的状态信息并避免重复。
我们正在处理如何每隔一定时间清除一次状态而不会丢失最近处理的记录的风险,如果它们在即将到来的停止中需要的话。 (即,我们需要一个类似 TTL 的东西 class)。
我们想过一个定时器,每隔一定时间清除一次状态,但不符合我们的要求,因为我们需要保留最近处理的记录。
我们读到 here 使用事件时间处理会在 window 过期后自动清除状态信息,我们想知道这是否符合我们使用 StateSpec
class.
否则,有没有class存储状态,有一种TTL来实现这个功能?
我们现在有的是这段检查元素是否已经处理过的代码和一个每隔一定时间清除状态的方法
@StateId("keyPreserved")
private final StateSpec<ValueState<Boolean>> keyPreserved = StateSpecs.value(BooleanCoder.of());
@TimerId("resetStateTimer")
private final TimerSpec resetStateTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
public void processElement(ProcessContext context,
@TimerId("resetStateTimer") Timer resetStateTimer,
@StateId("keyPreserved") ValueState<Boolean> keyPreservedState) {
if (!firstNonNull(keyPreservedState.read(), false)) {
T message = context.element().getValue();
//Process element here
keyPreservedState.write(true);
}
}
@OnTimer("resetStateTimer")
public void onResetStateTimer(OnTimerContext context,
@StateId("keyPreserved") ValueState<Boolean> keyPreservedState) {
keyPreservedState.clear();
}
每次调用 keyPreservedState.write(true);
时设置定时器就足够了。当计时器到期时 keyPreservedState.clear();
只清除上下文中的元素,而不是整个状态。
我们正在从 Kinesis 读取并写入 parquet,我们使用 StateSpec<ValueState<Boolean>>
来避免在从最后一个保存点正常停止并重新启动我们的管道后重复处理记录。
我们看到一些记录是重复的,因为它们最终会在后续重新启动时落入不同的任务管理器,我们使用 StateSpec<ValueState<Boolean>>
来存储有关已处理记录的状态信息并避免重复。
我们正在处理如何每隔一定时间清除一次状态而不会丢失最近处理的记录的风险,如果它们在即将到来的停止中需要的话。 (即,我们需要一个类似 TTL 的东西 class)。
我们想过一个定时器,每隔一定时间清除一次状态,但不符合我们的要求,因为我们需要保留最近处理的记录。
我们读到 here 使用事件时间处理会在 window 过期后自动清除状态信息,我们想知道这是否符合我们使用 StateSpec
class.
否则,有没有class存储状态,有一种TTL来实现这个功能?
我们现在有的是这段检查元素是否已经处理过的代码和一个每隔一定时间清除状态的方法
@StateId("keyPreserved")
private final StateSpec<ValueState<Boolean>> keyPreserved = StateSpecs.value(BooleanCoder.of());
@TimerId("resetStateTimer")
private final TimerSpec resetStateTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
public void processElement(ProcessContext context,
@TimerId("resetStateTimer") Timer resetStateTimer,
@StateId("keyPreserved") ValueState<Boolean> keyPreservedState) {
if (!firstNonNull(keyPreservedState.read(), false)) {
T message = context.element().getValue();
//Process element here
keyPreservedState.write(true);
}
}
@OnTimer("resetStateTimer")
public void onResetStateTimer(OnTimerContext context,
@StateId("keyPreserved") ValueState<Boolean> keyPreservedState) {
keyPreservedState.clear();
}
每次调用 keyPreservedState.write(true);
时设置定时器就足够了。当计时器到期时 keyPreservedState.clear();
只清除上下文中的元素,而不是整个状态。