Apache Beam Stateful DoFn 定期输出所有 K/V 对
Apache Beam Stateful DoFn Periodically Output All K/V Pairs
我正在尝试使用有状态的 DoFn(使用 @ProcessElement
和 @StateId
ValueState
元素在 Apache Beam 中(通过 Scio)聚合(每个键)流数据源.我认为这最适合我要解决的问题。要求是:
- 对于给定的键,所有时间的记录都会聚合(基本上是求和)- 我不关心以前计算的聚合,只关心最近的
- 根据我控制的特定条件
,密钥可能会从状态(state.clear()
)中被逐出
- 每 5 分钟,无论是否看到任何新密钥,所有未从状态
中驱逐的密钥 都应该输出
鉴于这是一个流媒体管道并且将无限期 运行ning,在全局 window 上使用 combinePerKey
并累积触发的窗格似乎会继续增加其随着时间的推移,内存占用和它需要 运行 的数据量,所以我想避免它。此外,在对此进行测试时,(可能如预期的那样)它只是将新计算的聚合与历史输入一起附加到输出,而不是为每个键使用最新值。
我的想法是,使用 StatefulDoFn 只会让我输出到现在为止的所有全局状态 (),但这似乎不是一个简单的解决方案。我已经看到有关使用计时器为此人为执行回调的暗示,以及可能使用缓慢增长的侧输入映射()并以某种方式刷新它,但这基本上需要迭代映射中的所有值而不是加入它。
我觉得我可能忽略了一些简单的东西来让它工作。我对 Beam 中 windowing 和计时器的许多概念还比较陌生,正在寻找有关如何解决此问题的任何建议。谢谢!
你说得对,Stateful DoFn 应该可以帮助你。这是您可以做什么的基本草图。请注意,这仅输出没有密钥的总和。它可能不是你想要的,但它应该能帮助你前进。
class CombiningEmittingFn extends DoFn<KV<Integer, Integer>, Integer> {
@TimerId("emitter")
private final TimerSpec emitterSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
@StateId("done")
private final StateSpec<ValueState<Boolean>> doneState = StateSpecs.value();
@StateId("agg")
private final StateSpec<CombiningState<Integer, int[], Integer>>
aggSpec = StateSpecs.combining(
Sum.ofIntegers().getAccumulatorCoder(null, VarIntCoder.of()), Sum.ofIntegers());
@ProcessElement
public void processElement(ProcessContext c,
@StateId("agg") CombiningState<Integer, int[], Integer> aggState,
@StateId("done") ValueState<Boolean> doneState,
@TimerId("emitter") Timer emitterTimer) throws Exception {
if (SOME CONDITION) {
countValueState.clear();
doneState.write(true);
} else {
countValueState.addAccum(c.element().getValue());
emitterTimer.align(Duration.standardMinutes(5)).setRelative();
}
}
}
@OnTimer("emitter")
public void onEmit(
OnTimerContext context,
@StateId("agg") CombiningState<Integer, int[], Integer> aggState,
@StateId("done") ValueState<Boolean> doneState,
@TimerId("emitter") Timer emitterTimer) {
Boolean isDone = doneState.read();
if (isDone != null && isDone) {
return;
} else {
context.output(aggState.getAccum());
// Set the timer to emit again
emitterTimer.align(Duration.standardMinutes(5)).setRelative();
}
}
}
}
很高兴与您反复讨论可行的方法。
@Pablo 确实是正确的,StatefulDoFn 和计时器在这种情况下很有用。这是我能够开始工作的代码。
有状态 Do Fn
// DomainState is a custom case class I'm using
type DoFnT = DoFn[KV[String, DomainState], KV[String, DomainState]]
class StatefulDoFn extends DoFnT {
@StateId("key")
private val keySpec = StateSpecs.value[String]()
@StateId("domainState")
private val domainStateSpec = StateSpecs.value[DomainState]()
@TimerId("loopingTimer")
private val loopingTimer: TimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME)
@ProcessElement
def process(
context: DoFnT#ProcessContext,
@StateId("key") stateKey: ValueState[String],
@StateId("domainState") stateValue: ValueState[DomainState],
@TimerId("loopingTimer") loopingTimer: Timer): Unit = {
... logic to create key/value from potentially null values
if (keepState(value)) {
loopingTimer.align(Duration.standardMinutes(5)).setRelative()
stateKey.write(key)
stateValue.write(value)
if (flushState(value)) {
context.output(KV.of(key, value))
}
} else {
stateValue.clear()
}
}
@OnTimer("loopingTimer")
def onLoopingTimer(
context: DoFnT#OnTimerContext,
@StateId("key") stateKey: ValueState[String],
@StateId("domainState") stateValue: ValueState[DomainState],
@TimerId("loopingTimer") loopingTimer: Timer): Unit = {
... logic to create key/value checking for nulls
if (keepState(value)) {
loopingTimer.align(Duration.standardMinutes(5)).setRelative()
if (flushState(value)) {
context.output(KV.of(key, value))
}
}
}
}
有管道
sc
.pubsubSubscription(...)
.keyBy(...)
.withGlobalWindow()
.applyPerKeyDoFn(new StatefulDoFn())
.withFixedWindows(
duration = Duration.standardMinutes(5),
options = WindowOptions(
accumulationMode = DISCARDING_FIRED_PANES,
trigger = AfterWatermark.pastEndOfWindow(),
allowedLateness = Duration.ZERO,
// Only take the latest per key during a window
timestampCombiner = TimestampCombiner.END_OF_WINDOW
))
.reduceByKey(mostRecentEvent())
.saveAsCustomOutput(TextIO.write()...)
我正在尝试使用有状态的 DoFn(使用 @ProcessElement
和 @StateId
ValueState
元素在 Apache Beam 中(通过 Scio)聚合(每个键)流数据源.我认为这最适合我要解决的问题。要求是:
- 对于给定的键,所有时间的记录都会聚合(基本上是求和)- 我不关心以前计算的聚合,只关心最近的
- 根据我控制的特定条件 ,密钥可能会从状态(
- 每 5 分钟,无论是否看到任何新密钥,所有未从状态 中驱逐的密钥 都应该输出
state.clear()
)中被逐出
鉴于这是一个流媒体管道并且将无限期 运行ning,在全局 window 上使用 combinePerKey
并累积触发的窗格似乎会继续增加其随着时间的推移,内存占用和它需要 运行 的数据量,所以我想避免它。此外,在对此进行测试时,(可能如预期的那样)它只是将新计算的聚合与历史输入一起附加到输出,而不是为每个键使用最新值。
我的想法是,使用 StatefulDoFn 只会让我输出到现在为止的所有全局状态 (),但这似乎不是一个简单的解决方案。我已经看到有关使用计时器为此人为执行回调的暗示,以及可能使用缓慢增长的侧输入映射(
我觉得我可能忽略了一些简单的东西来让它工作。我对 Beam 中 windowing 和计时器的许多概念还比较陌生,正在寻找有关如何解决此问题的任何建议。谢谢!
你说得对,Stateful DoFn 应该可以帮助你。这是您可以做什么的基本草图。请注意,这仅输出没有密钥的总和。它可能不是你想要的,但它应该能帮助你前进。
class CombiningEmittingFn extends DoFn<KV<Integer, Integer>, Integer> {
@TimerId("emitter")
private final TimerSpec emitterSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
@StateId("done")
private final StateSpec<ValueState<Boolean>> doneState = StateSpecs.value();
@StateId("agg")
private final StateSpec<CombiningState<Integer, int[], Integer>>
aggSpec = StateSpecs.combining(
Sum.ofIntegers().getAccumulatorCoder(null, VarIntCoder.of()), Sum.ofIntegers());
@ProcessElement
public void processElement(ProcessContext c,
@StateId("agg") CombiningState<Integer, int[], Integer> aggState,
@StateId("done") ValueState<Boolean> doneState,
@TimerId("emitter") Timer emitterTimer) throws Exception {
if (SOME CONDITION) {
countValueState.clear();
doneState.write(true);
} else {
countValueState.addAccum(c.element().getValue());
emitterTimer.align(Duration.standardMinutes(5)).setRelative();
}
}
}
@OnTimer("emitter")
public void onEmit(
OnTimerContext context,
@StateId("agg") CombiningState<Integer, int[], Integer> aggState,
@StateId("done") ValueState<Boolean> doneState,
@TimerId("emitter") Timer emitterTimer) {
Boolean isDone = doneState.read();
if (isDone != null && isDone) {
return;
} else {
context.output(aggState.getAccum());
// Set the timer to emit again
emitterTimer.align(Duration.standardMinutes(5)).setRelative();
}
}
}
}
很高兴与您反复讨论可行的方法。
@Pablo 确实是正确的,StatefulDoFn 和计时器在这种情况下很有用。这是我能够开始工作的代码。
有状态 Do Fn
// DomainState is a custom case class I'm using
type DoFnT = DoFn[KV[String, DomainState], KV[String, DomainState]]
class StatefulDoFn extends DoFnT {
@StateId("key")
private val keySpec = StateSpecs.value[String]()
@StateId("domainState")
private val domainStateSpec = StateSpecs.value[DomainState]()
@TimerId("loopingTimer")
private val loopingTimer: TimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME)
@ProcessElement
def process(
context: DoFnT#ProcessContext,
@StateId("key") stateKey: ValueState[String],
@StateId("domainState") stateValue: ValueState[DomainState],
@TimerId("loopingTimer") loopingTimer: Timer): Unit = {
... logic to create key/value from potentially null values
if (keepState(value)) {
loopingTimer.align(Duration.standardMinutes(5)).setRelative()
stateKey.write(key)
stateValue.write(value)
if (flushState(value)) {
context.output(KV.of(key, value))
}
} else {
stateValue.clear()
}
}
@OnTimer("loopingTimer")
def onLoopingTimer(
context: DoFnT#OnTimerContext,
@StateId("key") stateKey: ValueState[String],
@StateId("domainState") stateValue: ValueState[DomainState],
@TimerId("loopingTimer") loopingTimer: Timer): Unit = {
... logic to create key/value checking for nulls
if (keepState(value)) {
loopingTimer.align(Duration.standardMinutes(5)).setRelative()
if (flushState(value)) {
context.output(KV.of(key, value))
}
}
}
}
有管道
sc
.pubsubSubscription(...)
.keyBy(...)
.withGlobalWindow()
.applyPerKeyDoFn(new StatefulDoFn())
.withFixedWindows(
duration = Duration.standardMinutes(5),
options = WindowOptions(
accumulationMode = DISCARDING_FIRED_PANES,
trigger = AfterWatermark.pastEndOfWindow(),
allowedLateness = Duration.ZERO,
// Only take the latest per key during a window
timestampCombiner = TimestampCombiner.END_OF_WINDOW
))
.reduceByKey(mostRecentEvent())
.saveAsCustomOutput(TextIO.write()...)