Storm 如何知道消息何时为 "fully processed"?
How does Storm know when a message is "fully processed"?
(还有几个关于超时和 maxSpoutPending 的问题)
我在 Storm 文档中看到很多关于消息被完全处理的参考资料。但是我的 KafkaSpout 如何知道消息何时被完全处理?
希望它知道我的螺栓连接方式,以便当我的流中的最后一个螺栓确认一个元组时,喷口知道我的消息何时被处理?
否则,我会想象在超时期限到期后,检查消息的 ack-ed 状态,如果由 acking/anchoring 异或指示,则认为它已处理。但我希望不是这样?
我也有关于maxTuplesPending和超时配置的相关问题。
如果我将 maxTuplePending 设置为 10k,那么我认为每个 spout 实例将继续发出元组直到该 spout 实例正在跟踪 10k 个正在运行的元组,还有 10k 个尚未完全处理的元组的想法是否正确?然后当当前正在运行的消息被完全处理时发出一个新的元组?
最后,这与超时配置有关吗?在发出新消息之前,喷口是否以任何方式等待配置的超时发生?或者超时配置是否仅在消息 stalled/slow 正在处理时才起作用,导致它因超时而失败?
更简洁(或希望更清楚),将我的超时设置为 30 分钟是否有效果,除了消息不会失败,除非它们在 30 分钟内被最终的 Bolt 确认?还是有其他影响,比如超时配置影响spouts的发射率?
对于冗长而杂乱的问题,我们深表歉意。预先感谢您的任何回复。
*编辑以进一步澄清
我之所以担心,是因为我的消息不一定 运行 通过整个流。
假设我有螺栓 A、B、C、D。大多数时候消息将从 A->B->->D 传递。但是我有一些消息会故意在螺栓 A 上停止。A 会确认它们但不会发出它们(因为我的业务逻辑,在那些情况下我确实希望进一步处理消息)。
那么我的 KafkaSpout 是否会知道已确认但未从 A 发出的消息已完全处理?在这种情况下,我希望在 Bolt A 完成后立即从 spout 发出另一条消息。
Storm 通过 UDF 代码必须使用的锚定机制在整个拓扑中跟踪元组。这种锚定导致了所谓的元组树,树的根是喷口发出的元组,所有其他节点(在树结构中连接)表示从使用的螺栓 发出的元组输入元组作为锚点(这只是一个逻辑模型,但在 Storm 中并未以这种方式实现)。
例如,Spout 发出一个句子元组,该元组由单词中的第一个螺栓拆分,第二个螺栓过滤一些单词,第三个螺栓应用单词计数。最后,沉头螺栓将结果写入文件。树看起来像这样:
"this is an example sentence" -+-> "this"
+-> "is"
+-> "an"
+-> "example" -> "example",1 -> "example",1
+-> "sentence" -> "sentence",1 -> "sentence",1
初始句子由 spout 发出,由 bolt1 用作发出的所有标记的锚点,并由 bolt1 确认。 Bolt2 过滤掉 "this"、"is" 和 "an" 并仅确认三个元组。 "example" 和 "sentence" 只是转发,用作输出元组的锚点,然后进行确认。同样的事情发生在 bolt2 中,最终的 sink bolt 只确认所有传入的元组。
此外,Storm 跟踪所有元组的所有 acks,即来自中间 bolts 和 sink bolts 的所有 acks。首先,spout 将输出元组的 ID 发送到 acker 任务。每次使用元组作为锚点时,acker 也会收到一条消息,其中包含锚点元组 ID 和输出元组 ID(由 Storm 自动生成)。来自 bolt 的 ackes 也转到与它们异或的相同 acker 任务。如果收到所有 acks——即,对于 spout 和所有递归锚定的输出元组——(异或结果将为零),acker 将向 spout 发送一条消息,表明元组已完全处理,并返回 Spout.ack(MessageId)
发生(即,当元组被完全处理时,回调用立即完成)。此外,ackers 会定期检查是否存在 acker 注册的元组超过超时时间。如果发生这种情况,元组 ID 将被 acker 丢弃,并向 spout 发送元组失败的消息(导致调用 Spout.fail(MessageId)
)。
此外,Spouts 会保留所有正在运行的元组的计数,如果此计数超过 maxTuplesPending
参数,则停止调用 Spout.nextTuple()
。据我所知,该参数是全局应用的,即每个 spout 任务的本地计数相加,并将全局计数与参数进行比较(虽然不确定如何详细实现)。
所以timeout
参数独立于maxTuplesPending
。
(还有几个关于超时和 maxSpoutPending 的问题)
我在 Storm 文档中看到很多关于消息被完全处理的参考资料。但是我的 KafkaSpout 如何知道消息何时被完全处理?
希望它知道我的螺栓连接方式,以便当我的流中的最后一个螺栓确认一个元组时,喷口知道我的消息何时被处理?
否则,我会想象在超时期限到期后,检查消息的 ack-ed 状态,如果由 acking/anchoring 异或指示,则认为它已处理。但我希望不是这样?
我也有关于maxTuplesPending和超时配置的相关问题。
如果我将 maxTuplePending 设置为 10k,那么我认为每个 spout 实例将继续发出元组直到该 spout 实例正在跟踪 10k 个正在运行的元组,还有 10k 个尚未完全处理的元组的想法是否正确?然后当当前正在运行的消息被完全处理时发出一个新的元组?
最后,这与超时配置有关吗?在发出新消息之前,喷口是否以任何方式等待配置的超时发生?或者超时配置是否仅在消息 stalled/slow 正在处理时才起作用,导致它因超时而失败?
更简洁(或希望更清楚),将我的超时设置为 30 分钟是否有效果,除了消息不会失败,除非它们在 30 分钟内被最终的 Bolt 确认?还是有其他影响,比如超时配置影响spouts的发射率?
对于冗长而杂乱的问题,我们深表歉意。预先感谢您的任何回复。
*编辑以进一步澄清
我之所以担心,是因为我的消息不一定 运行 通过整个流。
假设我有螺栓 A、B、C、D。大多数时候消息将从 A->B->->D 传递。但是我有一些消息会故意在螺栓 A 上停止。A 会确认它们但不会发出它们(因为我的业务逻辑,在那些情况下我确实希望进一步处理消息)。
那么我的 KafkaSpout 是否会知道已确认但未从 A 发出的消息已完全处理?在这种情况下,我希望在 Bolt A 完成后立即从 spout 发出另一条消息。
Storm 通过 UDF 代码必须使用的锚定机制在整个拓扑中跟踪元组。这种锚定导致了所谓的元组树,树的根是喷口发出的元组,所有其他节点(在树结构中连接)表示从使用的螺栓 发出的元组输入元组作为锚点(这只是一个逻辑模型,但在 Storm 中并未以这种方式实现)。
例如,Spout 发出一个句子元组,该元组由单词中的第一个螺栓拆分,第二个螺栓过滤一些单词,第三个螺栓应用单词计数。最后,沉头螺栓将结果写入文件。树看起来像这样:
"this is an example sentence" -+-> "this"
+-> "is"
+-> "an"
+-> "example" -> "example",1 -> "example",1
+-> "sentence" -> "sentence",1 -> "sentence",1
初始句子由 spout 发出,由 bolt1 用作发出的所有标记的锚点,并由 bolt1 确认。 Bolt2 过滤掉 "this"、"is" 和 "an" 并仅确认三个元组。 "example" 和 "sentence" 只是转发,用作输出元组的锚点,然后进行确认。同样的事情发生在 bolt2 中,最终的 sink bolt 只确认所有传入的元组。
此外,Storm 跟踪所有元组的所有 acks,即来自中间 bolts 和 sink bolts 的所有 acks。首先,spout 将输出元组的 ID 发送到 acker 任务。每次使用元组作为锚点时,acker 也会收到一条消息,其中包含锚点元组 ID 和输出元组 ID(由 Storm 自动生成)。来自 bolt 的 ackes 也转到与它们异或的相同 acker 任务。如果收到所有 acks——即,对于 spout 和所有递归锚定的输出元组——(异或结果将为零),acker 将向 spout 发送一条消息,表明元组已完全处理,并返回 Spout.ack(MessageId)
发生(即,当元组被完全处理时,回调用立即完成)。此外,ackers 会定期检查是否存在 acker 注册的元组超过超时时间。如果发生这种情况,元组 ID 将被 acker 丢弃,并向 spout 发送元组失败的消息(导致调用 Spout.fail(MessageId)
)。
此外,Spouts 会保留所有正在运行的元组的计数,如果此计数超过 maxTuplesPending
参数,则停止调用 Spout.nextTuple()
。据我所知,该参数是全局应用的,即每个 spout 任务的本地计数相加,并将全局计数与参数进行比较(虽然不确定如何详细实现)。
所以timeout
参数独立于maxTuplesPending
。