使用 Apache Beam 按键处理事件的总排序

Processing Total Ordering of Events By Key using Apache Beam

问题背景

我正在尝试从实时流中为每个键生成事件项的总(线性)顺序,其中顺序是事件时间(从事件有效负载派生)。

方法

我曾尝试使用流式传输实现此功能,如下所示:

1) 设置一个非重叠顺序windows,例如时长 5 分钟

2) 建立一个允许的迟到 - 丢弃迟到的事件是可以的

3) 设置累积模式以保留所有触发的窗格

4) 使用"AfterwaterMark"触发器

5) 处理触发的窗格时,只考虑最后一个窗格

6) 使用 GroupBy.perKey 确保此键的 window 中的所有事件都将在单个资源上作为一个单元处理

虽然这种方法确保了给定 window 中每个键的线性顺序,但它并不能保证跨越多个 windows,例如可能有一个 window 键的事件发生在与之前的 window 同时处理之后,如果第一个 window 失败并且不得不重试。

我正在考虑采用这种方法,首先可以处理实时流,以便它按键对事件进行分区,并将它们写入由 window 范围命名的文件中。 由于光束处理的并行性质,这些文件也会乱序生成。 然后,单个流程协调员可以将这些文件按顺序提交到批处理管道 - 只有在收到前一个文件并且下游处理成功完成后才提交下一个文件。

问题是 Apache Beam 只会在那个时间至少有一个时间元素时触发一个窗格 window。因此,如果事件中存在间隙,则生成的文件中可能存在间隙 - 即丢失文件。丢失文件的问题是,协调批处理处理器无法区分是否知道时间 window 已经过去但没有数据,或者如果出现故障,在文件最终到达之前它无法继续。

强制触发事件 windows 的一种方法可能是以某种方式将虚拟事件添加到每个分区和时间 window 的流中。然而,这很棘手......如果时间序列中存在很大的差距,那么如果这些虚拟事件发生的时间很晚,那么它们将被视为迟到而被丢弃。

是否有其他方法确保每个可能的事件都有触发器 window,即使这会导致输出空文件?

从实时流中按键生成总排序是否是 Apache Beam 易于处理的问题?我应该考虑另一种方法吗?

根据您对易处理的定义,在 Apache Beam 中完全可以按事件时间戳对每个键的流进行排序。

以下是设计背后的考虑因素:

  1. Apache Beam 不保证按顺序传输,因此在管道内没有用处。所以我假设您正在这样做,这样您就可以写入外部系统,并且只有按顺序处理的能力。
  2. 如果一个事件有时间戳 t,你永远不能确定没有更早的事件会到达,除非你等到 t 是可删除的。

所以我们将这样做:

  1. 我们将在全局 window 中编写一个使用 state and timers (blog post still under review) 的 ParDo。这使它成为一个按键工作流。
  2. 我们将在元素到达时缓冲状态中的元素。所以你允许的迟到会影响你需要的数据结构的效率。你需要的是一个堆来查看和弹出最小时间戳和元素;没有内置的堆状态,所以我将它写成 ValueState.
  3. 我们将设置一个 事件时间 计时器以在元素的时间戳不再矛盾时接收回调。

为简洁起见,我将采用自定义 EventHeap 数据结构。在实践中,您希望将其分解为多个状态单元以最小化传输的数据。堆可能是对原始状态类型的合理补充。

我还假设我们需要的所有编码器都已经注册并专注于状态和计时器逻辑。

new DoFn<KV<K, Event>, Void>() {

  @StateId("heap")
  private final StateSpec<ValueState<EventHeap>> heapSpec = StateSpecs.value();

  @TimerId("next")
  private final TimerSpec nextTimerSpec = TimerSpec.timer(TimeDomain.EVENT_TIME);

  @ProcessElement
  public void process(
      ProcessContext ctx,
      @StateId("heap") ValueState<EventHeap> heapState,
      @TimerId("next") Timer nextTimer) {
    EventHeap heap = firstNonNull(
      heapState.read(),
      EventHeap.createForKey(ctx.element().getKey()));
    heap.add(ctx.element().getValue());
    // When the watermark reaches this time, no more elements
    // can show up that have earlier timestamps
    nextTimer.set(heap.nextTimestamp().plus(allowedLateness);
  }

  @OnTimer("next")
  public void onNextTimestamp(
      OnTimerContext ctx,
      @StateId("heap") ValueState<EventHeap> heapState,
      @TimerId("next") Timer nextTimer) {
    EventHeap heap = heapState.read();
    // If the timer at time t was delivered the watermark must
    // be strictly greater than t
    while (!heap.nextTimestamp().isAfter(ctx.timestamp())) {
      writeToExternalSystem(heap.pop());
    }
    nextTimer.set(heap.nextTimestamp().plus(allowedLateness);
  }
}

无论您的基础用例是什么,这都有望让您开始前进。