元组失败太多 - 风暴拓扑
Too many tuple failures- Storm Topology
我有一个风暴应用程序,其中有 1 个喷口和 5 个螺栓。拓扑工作正常。但我在 30 分钟后给出 Too many tuple failures
错误。在第一个螺栓到第二个螺栓中,由于某些分析条件,仅处理了 20% 的数据。 80% 的数据被丢弃。我认为这个错误是由于 80% 的数据被丢弃或其他原因造成的。不知道是什么原因,如何解决。
如果您在 Storm 中使用容错(即,将消息 ID 分配给 spout 中的元组),则需要 ack
all 个元组消耗 spout 的输出。即使您由于过滤条件而丢弃了一些元组,因为 "discarding a tuple" 仍然意味着该元组已完全处理,即,您需要将此告诉 Storm —— 否则,Storm 认为出了点问题(由于超时) 并使元组失败。
KafkaSpouts 自动分配消息 ID。您只需要确认所有传入的元组:
void execute(Tuple input) {
if(input-is-forwarded) {
collector.emit(input, new Values(/* generate output tuple */);
}
// ack tuple (regardless if forwarded or discarded)
collector.ack(input);
}
我有一个风暴应用程序,其中有 1 个喷口和 5 个螺栓。拓扑工作正常。但我在 30 分钟后给出 Too many tuple failures
错误。在第一个螺栓到第二个螺栓中,由于某些分析条件,仅处理了 20% 的数据。 80% 的数据被丢弃。我认为这个错误是由于 80% 的数据被丢弃或其他原因造成的。不知道是什么原因,如何解决。
如果您在 Storm 中使用容错(即,将消息 ID 分配给 spout 中的元组),则需要 ack
all 个元组消耗 spout 的输出。即使您由于过滤条件而丢弃了一些元组,因为 "discarding a tuple" 仍然意味着该元组已完全处理,即,您需要将此告诉 Storm —— 否则,Storm 认为出了点问题(由于超时) 并使元组失败。
KafkaSpouts 自动分配消息 ID。您只需要确认所有传入的元组:
void execute(Tuple input) {
if(input-is-forwarded) {
collector.emit(input, new Values(/* generate output tuple */);
}
// ack tuple (regardless if forwarded or discarded)
collector.ack(input);
}