Apache Beam Stateful DoFn 定期输出所有 K/V 对

Apache Beam Stateful DoFn Periodically Output All K/V Pairs

我正在尝试使用有状态的 DoFn(使用 @ProcessElement@StateId ValueState 元素在 Apache Beam 中(通过 Scio)聚合(每个键)流数据源.我认为这最适合我要解决的问题。要求是:

鉴于这是一个流媒体管道并且将无限期 运行ning,在全局 window 上使用 combinePerKey 并累积触发的窗格似乎会继续增加其随着时间的推移,内存占用和它需要 运行 的数据量,所以我想避免它。此外,在对此进行测试时,(可能如预期的那样)它只是将新计算的聚合与历史输入一起附加到输出,而不是为每个键使用最新值。

我的想法是,使用 StatefulDoFn 只会让我输出到现在为止的所有全局状态 (),但这似乎不是一个简单的解决方案。我已经看到有关使用计时器为此人为执行回调的暗示,以及可能使用缓慢增长的侧输入映射()并以某种方式刷新它,但这基本上需要迭代映射中的所有值而不是加入它。

我觉得我可能忽略了一些简单的东西来让它工作。我对 Beam 中 windowing 和计时器的许多概念还比较陌生,正在寻找有关如何解决此问题的任何建议。谢谢!

你说得对,Stateful DoFn 应该可以帮助你。这是您可以做什么的基本草图。请注意,这仅输出没有密钥的总和。它可能不是你想要的,但它应该能帮助你前进。

class CombiningEmittingFn extends DoFn<KV<Integer, Integer>, Integer> {

  @TimerId("emitter")
  private final TimerSpec emitterSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

  @StateId("done")
  private final StateSpec<ValueState<Boolean>> doneState = StateSpecs.value();

  @StateId("agg")
  private final StateSpec<CombiningState<Integer, int[], Integer>>
      aggSpec = StateSpecs.combining(
          Sum.ofIntegers().getAccumulatorCoder(null, VarIntCoder.of()), Sum.ofIntegers());

  @ProcessElement
  public void processElement(ProcessContext c,
      @StateId("agg") CombiningState<Integer, int[], Integer> aggState,
      @StateId("done") ValueState<Boolean> doneState,
      @TimerId("emitter") Timer emitterTimer) throws Exception {
        if (SOME CONDITION) {
          countValueState.clear();
          doneState.write(true);
        } else {
          countValueState.addAccum(c.element().getValue());
          emitterTimer.align(Duration.standardMinutes(5)).setRelative();
        }
      }
    }

  @OnTimer("emitter")
  public void onEmit(
      OnTimerContext context,
      @StateId("agg") CombiningState<Integer, int[], Integer> aggState,
      @StateId("done") ValueState<Boolean> doneState,
      @TimerId("emitter") Timer emitterTimer) {
      Boolean isDone = doneState.read();
      if (isDone != null && isDone) {
        return;
      } else {
        context.output(aggState.getAccum());
        // Set the timer to emit again
        emitterTimer.align(Duration.standardMinutes(5)).setRelative();
      }
    }
  }
  }

很高兴与您反复讨论可行的方法。

@Pablo 确实是正确的,StatefulDoFn 和计时器在这种情况下很有用。这是我能够开始工作的代码。

有状态 Do Fn

// DomainState is a custom case class I'm using
type DoFnT = DoFn[KV[String, DomainState], KV[String, DomainState]]

class StatefulDoFn extends DoFnT {

  @StateId("key")
  private val keySpec = StateSpecs.value[String]()

  @StateId("domainState")
  private val domainStateSpec = StateSpecs.value[DomainState]()

  @TimerId("loopingTimer")
  private val loopingTimer: TimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME)

  @ProcessElement
  def process(
      context: DoFnT#ProcessContext,
      @StateId("key") stateKey: ValueState[String],
      @StateId("domainState") stateValue: ValueState[DomainState],
      @TimerId("loopingTimer") loopingTimer: Timer): Unit = {

    ... logic to create key/value from potentially null values

    if (keepState(value)) {
      loopingTimer.align(Duration.standardMinutes(5)).setRelative()

      stateKey.write(key)
      stateValue.write(value)

      if (flushState(value)) {
        context.output(KV.of(key, value))
      }
    } else {
      stateValue.clear()
    }
  }

  @OnTimer("loopingTimer")
  def onLoopingTimer(
      context: DoFnT#OnTimerContext,
      @StateId("key") stateKey: ValueState[String],
      @StateId("domainState") stateValue: ValueState[DomainState],
      @TimerId("loopingTimer") loopingTimer: Timer): Unit = {

    ... logic to create key/value checking for nulls

    if (keepState(value)) {

      loopingTimer.align(Duration.standardMinutes(5)).setRelative()

      if (flushState(value)) {
        context.output(KV.of(key, value))
      }
    }
  }
}

有管道

sc
  .pubsubSubscription(...)
  .keyBy(...)
  .withGlobalWindow()
  .applyPerKeyDoFn(new StatefulDoFn())
  .withFixedWindows(
    duration = Duration.standardMinutes(5),
    options = WindowOptions(
      accumulationMode = DISCARDING_FIRED_PANES,
      trigger = AfterWatermark.pastEndOfWindow(),
      allowedLateness = Duration.ZERO,
      // Only take the latest per key during a window
      timestampCombiner = TimestampCombiner.END_OF_WINDOW
    ))
  .reduceByKey(mostRecentEvent())
  .saveAsCustomOutput(TextIO.write()...)