在 worker 失败或 bundle 重试期间如何维护 exactly-once 处理?

How is exactly-once processing maintained during worker failures or bundle retries?

我在 Dataflow 上有一个管道 运行 可以提取包含数千条记录的文件。这些文件以稳定的频率到达,由带有计时器的有状态 ParDo 处理,该计时器试图通过批处理和保存这些文件来限制摄取速率,直到计时器触发,然后通过文件处理 ParDo 扩展为单独的记录元素,并且终于写入 BigQuery 目的地。

有时,无论是 OOM 事件还是自动缩放事件等间歇性事件,我都看到 Dataflow 试图在事件解决后发出有状态 ParDo 中的文件,从而在文件处理 ParDo 重新处理时导致下游出现重复记录元素文件。我知道如果失败会重试捆绑包,但它们是否考虑了重复项?

How/What 是在此上下文中实现的恰好一次处理,特别是关于 State/Timer API,因为我在目的地看到重复项?

Dataflow 通过确保失败的 worker 产生的数据不会传递到下游(或者,更准确地说,如果重试工作,只有一个成功的结果被下游消耗),从而实现了 exactly once 处理。例如,如果您的管道的阶段 A 正在生产元素,而阶段 B 正在计算它们,并且阶段 A 中的工作人员失败并且是 re-tried,阶段 B 将不会计算重复元素(当然,阶段 B 可能本身必须重试)。这也适用于状态和定时器——一个给定的工作包要么被整体提交(即输入集被标记为已消耗,并且输出集与状态和定时器的 consumption/setting 原子地提交)或完全丢弃(state/timers 剩下 unconsumed/untouched 并且重试不会受到之前发生的事情的影响。)

不完全是与外部系统的交互(由于重试的可能性)。相反,这些至少有一次,因此为了保证正确性,所有此类交互都应该是幂等的。接收器通常通过分配一个唯一的 id 来实现这一点,这样可以在下游系统中对多个写入进行重复数据删除。对于文件,可以写入临时文件,然后将“获胜”的一组分片重命名为障碍后的最终目的地。从您的问题中不清楚您正在发出(或摄取)哪些文件,但希望这有助于理解系统的工作原理。

更具体地说,假设初始状态是{state: A, timers: [X, Y], inputs: [i, j, k]}。进一步假设在处理包(这些计时器和输入)时,状态更新为 B,我们向下游发出元素 m 和 n,并设置计时器 W.

如果捆绑成功,新状态将是{state: B, timers: [W], inputs: []}并且保证元素[m, n]被传递到下游。此外,此捆绑包的任何竞争重试总是会失败。

另一方面,如果 bundle 失败(即使它“发射”了一些元素或试图更新状态)系统的结果状态将是 {state: A, timers: [X, Y], inputs: [i, j, k]} 进行新的重试,下游不会观察到从这个失败的 bundle 发出的任何东西。

另一种看待它的方式是,集合{消耗的输入、消耗的定时器、状态修改、定时器集、下游产生的输出}在单个事务中写入支持“数据库”。只会提交一次成功的尝试,失败的尝试将被丢弃。

可以在 https://beam.apache.org/documentation/runtime/model/

找到更多详细信息