为什么我不应该在 Spout.nextTuple() 中循环或阻塞
Why should I not loop or block in Spout.nextTuple()
我看到许多代码片段,其中在 Spout.nextTuple()
中使用了循环(例如读取整个文件并为每一行发出一个元组):
public void nextTuple() {
// do other stuff here
// reader might be BufferedReader that is initialized in open()
String str;
while((str = reader.readLine()) != null) {
_collector.emit(new Values(str));
}
// do some more stuff here
}
这段代码似乎很简单,但是,有人告诉我 不应该在 nextTuple()
中循环 。问题是为什么?
执行 Spout 时,它会在单个线程中运行。此线程循环 "forever" 并有多项职责:
- 致电
Spout.nextTuple()
- 检索 "acks" 并处理它们
- 检索 "fails" 并处理它们
- 超时元组
要做到这一点,重要的是,在向系统发出元组后,不要在 nextTuple()
中停留 "forever"(即循环或阻塞),而是在 return 中停留(或者只是 return 如果没有元组可以发出,但 不要阻塞 )。否则,Spout 无法正常工作。 nextTuple()
将被 Storm 循环调用。因此,在处理 ack/fail 消息后,下一次调用 nextTuple()
会很快发生。
因此,在对 nextTuple()
的单次调用中发出多个元组也被认为是不好的做法。只要代码停留在 nextTuple()
中,spout 线程就不能(例如)对传入的 acks 做出反应。这可能会导致不必要的超时,因为无法及时处理确认。
最佳做法是为每次调用 nextTuple()
发出一个元组。如果没有可用的元组被发射,你应该return(不发射)而不是等到有可用的元组。
我看到许多代码片段,其中在 Spout.nextTuple()
中使用了循环(例如读取整个文件并为每一行发出一个元组):
public void nextTuple() {
// do other stuff here
// reader might be BufferedReader that is initialized in open()
String str;
while((str = reader.readLine()) != null) {
_collector.emit(new Values(str));
}
// do some more stuff here
}
这段代码似乎很简单,但是,有人告诉我 不应该在 nextTuple()
中循环 。问题是为什么?
执行 Spout 时,它会在单个线程中运行。此线程循环 "forever" 并有多项职责:
- 致电
Spout.nextTuple()
- 检索 "acks" 并处理它们
- 检索 "fails" 并处理它们
- 超时元组
要做到这一点,重要的是,在向系统发出元组后,不要在 nextTuple()
中停留 "forever"(即循环或阻塞),而是在 return 中停留(或者只是 return 如果没有元组可以发出,但 不要阻塞 )。否则,Spout 无法正常工作。 nextTuple()
将被 Storm 循环调用。因此,在处理 ack/fail 消息后,下一次调用 nextTuple()
会很快发生。
因此,在对 nextTuple()
的单次调用中发出多个元组也被认为是不好的做法。只要代码停留在 nextTuple()
中,spout 线程就不能(例如)对传入的 acks 做出反应。这可能会导致不必要的超时,因为无法及时处理确认。
最佳做法是为每次调用 nextTuple()
发出一个元组。如果没有可用的元组被发射,你应该return(不发射)而不是等到有可用的元组。