Storm:使用 tick 元组进行批处理时,为什么要等待 ack 元组?

Storm: When batching with tick tuples, why wait to ack tuples?

在我的拓扑中,我需要为通过它的每个元组执行插入语句。为了更好地处理我的数据库,我使用滴答元组模式对插入进行批处理。

我在网上看到的帖子指导实现模式如下:

-批量收集元组

-当 tick 发生时刷新批次(或者当批次增长超过一定大小时)

-确认批处理中的所有元组

但是,为什么我要等到我刷新批次来确认我的元组?如果刷新批处理出现异常(如数据库 timeout/error),批处理中的所有元组最终不会超时并重播怎么办?

如果我在批处理之前确认元组,而是根据元组内容对某些对象进行批处理,则不会重放元组。如果刷新我的批次失败,该批次将不会在异常时清除,并且其中的所有消息将在下次出现滴答时尝试再次插入?

If I ack the tuples prior to batching, and instead batch some Object based on the tuple contents, then the tuples will not be replayed.

是的,你确实是对的;这就是为什么你应该只在批处理成功后才确认它们。您确实希望所有消息都得到处理吗?

But, why do I want to wait until I flush the batch to ack my tuples? What if flushing the batch has an exception (like a database timeout/error), won't all the tuples in the batch eventually timeout and get replayed?

是的,元组将在超时时重播。但是,如果批次失败,您应该使它们失败(或重试批次)。


现在让我再给你一条建议,你不希望重放元组;它会导致数据源的巨大性能下降,例如Kafka非常快,因为它执行顺序读取,一个元组重放让kafka去寻找要重放的元组。因此你应该:

  1. 如果批处理失败,请检查元组是否真的可以插入到数据库中。例如,您可能在数据库中有一个 not null constraint 而您的元组字段为空。在这种情况下,您应该确认元组,因为您将永远无法将此元组插入数据库。
  2. 您应该在失败之前重试插入元组
  3. 您想使元组失败而不是让它们超时。等待元组超时不是一个好的做法,而是让它们失败。您可以在 Storm UI 上看到元组在哪个螺栓上失败,您看不到元组在哪个螺栓上超时。
  4. 记录元组失败,因为如果无法插入元组(例如记住 not null 约束),您想知道这类事情并更改代码以处理这种情况(例如建议 1,但是有其他)。

我不能完全理解你的描述。但是,您应该执行以下操作:

  1. 批量收集元组
  2. 齐平元组(刻度或大小)

    • 成功插入事务后,确认批次的所有元组
    • 插入失败,没有确认(稍后再尝试插入,直到插入成功)

作为重试模式,您可以使用例如下一个填满的批次或下一个滴答元组。对于这种情况,您只需允许更大的批次大小或尝试依次插入两个批次。

如果您在成功插入数据库之前确认元组,则可能会在 bolt 崩溃时丢失元组。在确认元组之后,Storm 允许 Spout 删除重新计算尚未插入的元组所需的源元组。因此,您无法重新计算它们。

作为替代方案,您也可以使批处理中的所有元组失败(如果无法插入)并触发 spout 重播源元组。这样做的好处是,您不会在 DB-insert-bolt 中建立 larger/multiple 批次。然而,缺点当然是Storm必须处理那些元组两次。