Storm 如何知道消息何时为 "fully processed"?

How does Storm know when a message is "fully processed"?

(还有几个关于超时和 maxSpoutPending 的问题)

我在 Storm 文档中看到很多关于消息被完全处理的参考资料。但是我的 KafkaSpout 如何知道消息何时被完全处理?

希望它知道我的螺栓连接方式,以便当我的流中的最后一个螺栓确认一个元组时,喷口知道我的消息何时被处理?

否则,我会想象在超时期限到期后,检查消息的 ack-ed 状态,如果由 acking/anchoring 异或指示,则认为它已处理。但我希望不是这样?

我也有关于maxTuplesPending和超时配置的相关问题。

如果我将 maxTuplePending 设置为 10k,那么我认为每个 spout 实例将继续发出元组直到该 spout 实例正在跟踪 10k 个正在运行的元组,还有 10k 个尚未完全处理的元组的想法是否正确?然后当当前正在运行的消息被完全处理时发出一个新的元组?

最后,这与超时配置有关吗?在发出新消息之前,喷口是否以任何方式等待配置的超时发生?或者超时配置是否仅在消息 stalled/slow 正在处理时才起作用,导致它因超时而失败?

更简洁(或希望更清楚),将我的超时设置为 30 分钟是否有效果,除了消息不会失败,除非它们在 30 分钟内被最终的 Bolt 确认?还是有其他影响,比如超时配置影响spouts的发射率?

对于冗长而杂乱的问题,我们深表歉意。预先感谢您的任何回复。

*编辑以进一步澄清

我之所以担心,是因为我的消息不一定 运行 通过整个流。

假设我有螺栓 A、B、C、D。大多数时候消息将从 A->B->->D 传递。但是我有一些消息会故意在螺栓 A 上停止。A 会确认它们但不会发出它们(因为我的业务逻辑,在那些情况下我确实希望进一步处理消息)。

那么我的 KafkaSpout 是否会知道已确认但未从 A 发出的消息已完全处理?在这种情况下,我希望在 Bolt A 完成后立即从 spout 发出另一条消息。

Storm 通过 UDF 代码必须使用的锚定机制在整个拓扑中跟踪元组。这种锚定导致了所谓的元组树,树的根是喷口发出的元组,所有其他节点(在树结构中连接)表示从使用的螺栓 发出的元组输入元组作为锚点(这只是一个逻辑模型,但在 Storm 中并未以这种方式实现)。

例如,Spout 发出一个句子元组,该元组由单词中的第一个螺栓拆分,第二个螺栓过滤一些单词,第三个螺栓应用单词计数。最后,沉头螺栓将结果写入文件。树看起来像这样:

"this is an example sentence" -+-> "this" 
                               +-> "is" 
                               +-> "an"
                               +-> "example" -> "example",1 -> "example",1
                               +-> "sentence" -> "sentence",1 -> "sentence",1

初始句子由 spout 发出,由 bolt1 用作发出的所有标记的锚点,并由 bolt1 确认。 Bolt2 过滤掉 "this"、"is" 和 "an" 并仅确认三个元组。 "example" 和 "sentence" 只是转发,用作输出元组的锚点,然后进行确认。同样的事情发生在 bolt2 中,最终的 sink bolt 只确认所有传入的元组。

此外,Storm 跟踪所有元组的所有 acks,即来自中间 bolts 和 sink bolts 的所有 acks。首先,spout 将输出元组的 ID 发送到 acker 任务。每次使用元组作为锚点时,acker 也会收到一条消息,其中包含锚点元组 ID 和输出元组 ID(由 Storm 自动生成)。来自 bolt 的 ackes 也转到与它们异或的相同 acker 任务。如果收到所有 acks——即,对于 spout 和所有递归锚定的输出元组——(异或结果将为零),acker 将向 spout 发送一条消息,表明元组已完全处理,并返回 Spout.ack(MessageId) 发生(即,当元组被完全处理时,回调用立即完成)。此外,ackers 会定期检查是否存在 acker 注册的元组超过超时时间。如果发生这种情况,元组 ID 将被 acker 丢弃,并向 spout 发送元组失败的消息(导致调用 Spout.fail(MessageId))。

此外,Spouts 会保留所有正在运行的元组的计数,如果此计数超过 maxTuplesPending 参数,则停止调用 Spout.nextTuple()。据我所知,该参数是全局应用的,即每个 spout 任务的本地计数相加,并将全局计数与参数进行比较(虽然不确定如何详细实现)。

所以timeout参数独立于maxTuplesPending