Apache Storm Trident .each() 函数解释

Apache Storm Trident .each() function explanation

我想在项目中使用 Apache Storm 的 TridentTopology。我发现很难理解 storm.trident.Stream class 中的 .each() 函数。以下是他们教程中给出的示例代码,供参考:

TridentTopology topology = new TridentTopology();        
TridentState wordCounts =
     topology.newStream("spout1", spout)
       .each(new Fields("sentence"), new Split(), new Fields("word"))
       .groupBy(new Fields("word"))
       .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))                
       .parallelismHint(6);

我不明白.each() 方法的签名。下面是我的理解。如果我错了,请纠正我,并提供更多信息以供我了解。

.each()

第一个参数是对输入元组的投影。在您的示例中,只有名称为 "sentence" 的字段提供给 Split。如果您的源发出具有架构 Fields("first", "sentence", "third") 的元组,您只能访问 Split 中的 "sentence"。此外,"sentence" 在 Split 中的索引为零(而不是一)。请注意,它不是输出的投影——所有字段都将保留在输出元组中!这只是 Split.

内整个元组的有限视图

最后一个参数是 Splitemit()Value 的架构。此字段名称作为新属性附加到输出元组。因此,输出元组的模式是输入元组的模式(原始的,不是由第一个参数投影的)加上最后一个参数的字段。

请参阅文档中的 "Function" 部分:https://storm.apache.org/releases/0.10.0/Trident-API-Overview.html