缓冲和刷新 Apache Beam 流数据

Buffer and flush Apache Beam streaming data

我有一个初始 运行 需要处理大量数据的流媒体作业。 DoFn 之一调用支持批处理请求的远程服务,因此在使用有界集合时,我使用以下方法:

  private static final class Function extends DoFn<String, Void> implements Serializable {
    private static final long serialVersionUID = 2417984990958377700L;

    private static final int LIMIT = 500;

    private transient Queue<String> buffered;

    @StartBundle
    public void startBundle(Context context) throws Exception {
      buffered = new LinkedList<>();
    }

    @ProcessElement
    public void processElement(ProcessContext context) throws Exception {
      buffered.add(context.element());

      if (buffered.size() > LIMIT) {
        flush();
      }
    }

    @FinishBundle
    public void finishBundle(Context c) throws Exception {
      // process remaining
      flush();
    }

    private void flush() {
      // build batch request
      while (!buffered.isEmpty()) {
        buffered.poll();
        // do something
      }
    }
  }

有没有办法 window 数据以便可以在无界集合上使用相同的方法?

我试过以下方法:

pipeline
    .apply("Read", Read.from(source))
    .apply(WithTimestamps.of(input -> Instant.now()))
    .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2L))))
    .apply("Process", ParDo.of(new Function()));

但是每个元素都会调用 startBundlefinishBundle。是否有机会使用 RxJava(2 分钟 windows 或 100 个元素包):

source
    .toFlowable(BackpressureStrategy.LATEST)
    .buffer(2, TimeUnit.MINUTES, 100) 

这是 per-key-and-windows 新功能的典型用例 state and timers

状态在a Beam blog post, while for timers you'll have to rely on the Javadoc. Nevermind what the javadoc says about runners supporting them, the true status is found in Beam's capability matrix中描述。

该模式与您所写的非常相似,但状态允许它与 windows 一起工作,也可以跨包工作,因为它们在流式传输中可能非常小。由于必须以某种方式对状态进行分区以保持并行性,因此您需要添加某种键。目前没有自动分片。

private static final class Function extends DoFn<KV<Key, String>, Void> implements Serializable {
  private static final long serialVersionUID = 2417984990958377700L;

  private static final int LIMIT = 500;

  @StateId("bufferedSize")
  private final StateSpec<Object, ValueState<Integer>> bufferedSizeSpec =
      StateSpecs.value(VarIntCoder.of());

  @StateId("buffered")
  private final StateSpec<Object, BagState<String>> bufferedSpec =
      StateSpecs.bag(StringUtf8Coder.of());

  @TimerId("expiry")
  private final TimerSpec expirySpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @ProcessElement
  public void processElement(
      ProcessContext context,
      BoundedWindow window,
      @StateId("bufferedSize") ValueState<Integer> bufferedSizeState,
      @StateId("buffered") BagState<String> bufferedState,
      @TimerId("expiry") Timer expiryTimer) {

    int size = firstNonNull(bufferedSizeState.read(), 0);
    bufferedState.add(context.element().getValue());
    size += 1;
    bufferedSizeState.write(size);
    expiryTimer.set(window.maxTimestamp().plus(allowedLateness));

    if (size > LIMIT) {
      flush(context, bufferedState, bufferedSizeState);
    }
  }

  @OnTimer("expiry")
  public void onExpiry(
      OnTimerContext context,
      @StateId("bufferedSize") ValueState<Integer> bufferedSizeState,
      @StateId("buffered") BagState<String> bufferedState) {
    flush(context, bufferedState, bufferedSizeState);
  }

  private void flush(
      WindowedContext context,
      BagState<String> bufferedState,
      ValueState<Integer> bufferedSizeState) {
    Iterable<String> buffered = bufferedState.read();

    // build batch request from buffered
    ...

    // clear things
    bufferedState.clear();
    bufferedSizeState.clear();
  }
}

在这里做一些笔记:

  • State 替换了您的 DoFn 的实例变量,因为 实例变量在 windows.
  • 之间没有内聚性
  • 缓冲区和大小只是根据需要初始化 @StartBundle.
  • BagState支持"blind"写入,所以不需要 任何读-修改-写,只需在同一个文件中提交新元素 就像你输出时的方式。
  • 同一时间重复设置一个定时器就好了; 它应该主要是一个空洞。
  • @OnTimer("expiry") 取代了 @FinishBundle,因为 完成捆绑包不是每个 window 的事情,而是 跑步者如何执行您的管道。

综上所述,如果您正在写入外部系统,也许您希望在写入之前将 windows 和重新 window 重新定义为全局 window您的书写方式取决于 window,因为 "the external world is globally windowed".

apache beam 0.6.0 的文档说 StateId 是 "Not currently supported by any runner."