Storm 拓扑中的可选流
Optional stream in Storm topology
我们有一个相当简单的风暴拓扑,让人头疼。
我们的一个螺栓可以发现它正在处理的数据是有效的并且每件事都正常进行,或者它可以发现它无效但可以修复。在这种情况下,我们需要将其发送以进行一些额外处理。
我们尝试使用单独的螺栓和流将此步骤作为拓扑的一部分。
declarer.declareStream(NORMAL_STREAM, getStreamFields());
declarer.declareStream(ERROR_STREAM, getErrorStreamFields());
在 execute 方法的末尾执行类似下面的内容。
if(errorOutput != null) {
collector.emit(ERROR_STREAM, input, errorOutput);
}
else {
collector.emit(NORMAL_STREAM, input, output);
}
collector.ack(input);
这确实有效,但是它有一个破坏性的效果,导致所有没有沿着这个错误路径走下去的元组都失败并被 spout 无休止地重新发送。
我认为这是因为错误螺栓无法为其未收到的消息发送确认,但确认事件会等待拓扑中的所有螺栓确认,然后再将确认发送回喷口。至少取出错误处理螺栓会导致所有东西都正确地返回到 spout。
实现这样的目标的最佳方法是什么?
错误螺栓可能比您怀疑的要慢,导致 error_stream 上的备份,进而导致第一个螺栓的备份,最终导致元组开始超时。当元组超时时,它会被 spout 重新发送。
尝试:
- 增加超时配置(topology.message.timeout.secs),
- 限制 spout 中的飞行元组数量 (topology.max.spout.pending) and/or
- 增加螺栓的并行度
我们有一个相当简单的风暴拓扑,让人头疼。
我们的一个螺栓可以发现它正在处理的数据是有效的并且每件事都正常进行,或者它可以发现它无效但可以修复。在这种情况下,我们需要将其发送以进行一些额外处理。
我们尝试使用单独的螺栓和流将此步骤作为拓扑的一部分。
declarer.declareStream(NORMAL_STREAM, getStreamFields());
declarer.declareStream(ERROR_STREAM, getErrorStreamFields());
在 execute 方法的末尾执行类似下面的内容。
if(errorOutput != null) {
collector.emit(ERROR_STREAM, input, errorOutput);
}
else {
collector.emit(NORMAL_STREAM, input, output);
}
collector.ack(input);
这确实有效,但是它有一个破坏性的效果,导致所有没有沿着这个错误路径走下去的元组都失败并被 spout 无休止地重新发送。
我认为这是因为错误螺栓无法为其未收到的消息发送确认,但确认事件会等待拓扑中的所有螺栓确认,然后再将确认发送回喷口。至少取出错误处理螺栓会导致所有东西都正确地返回到 spout。
实现这样的目标的最佳方法是什么?
错误螺栓可能比您怀疑的要慢,导致 error_stream 上的备份,进而导致第一个螺栓的备份,最终导致元组开始超时。当元组超时时,它会被 spout 重新发送。
尝试:
- 增加超时配置(topology.message.timeout.secs),
- 限制 spout 中的飞行元组数量 (topology.max.spout.pending) and/or
- 增加螺栓的并行度