为什么我不应该在 Spout.nextTuple() 中循环或阻塞

Why should I not loop or block in Spout.nextTuple()

我看到许多代码片段,其中在 Spout.nextTuple() 中使用了循环(例如读取整个文件并为每一行发出一个元组):

public void nextTuple() {
    // do other stuff here

    // reader might be BufferedReader that is initialized in open()
    String str;
    while((str = reader.readLine()) != null) {
        _collector.emit(new Values(str));
    }

    // do some more stuff here
}

这段代码似乎很简单,但是,有人告诉我 不应该在 nextTuple() 中循环 。问题是为什么?

执行 Spout 时,它会在单个线程中运行。此线程循环 "forever" 并有多项职责:

  1. 致电Spout.nextTuple()
  2. 检索 "acks" 并处理它们
  3. 检索 "fails" 并处理它们
  4. 超时元组

要做到这一点,重要的是,在向系统发出元组后,不要在 nextTuple() 中停留 "forever"(即循环或阻塞),而是在 return 中停留(或者只是 return 如果没有元组可以发出,但 不要阻塞 )。否则,Spout 无法正常工作。 nextTuple() 将被 Storm 循环调用。因此,在处理 ack/fail 消息后,下一次调用 nextTuple() 会很快发生。

因此,在对 nextTuple() 的单次调用中发出多个元组也被认为是不好的做法。只要代码停留在 nextTuple() 中,spout 线程就不能(例如)对传入的 acks 做出反应。这可能会导致不必要的超时,因为无法及时处理确认。

最佳做法是为每次调用 nextTuple() 发出一个元组。如果没有可用的元组被发射,你应该return(不发射)而不是等到有可用的元组。