Storm 中的字段和值连接
Field and values connection in Storm
我有一个关于storm 的基本问题。我能清楚地理解一些基本的东西。例如我有一个 main class 里面有这个代码:
...
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SENTENCE_SPOUT_ID, new SentenceSpout());
builder.setBolt(SPLIT_BOLT_ID, new SplitSentenceBolt()).shuffleGrouping(SENTENCE_SPOUT_ID);
builder.setBolt(COUNT_BOLT_ID, new WordCountBolt(), 3).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
builder.setBolt(REPORT_BOLT_ID, new ReportBolt()).globalGrouping(COUNT_BOLT_ID);
...
我知道 1st 元素(例如 "SENTENCE_SPOUT_ID")是 bolt/spout 的 ID,以显示其中 2 个之间的联系. 2nd 元素 (ex.new SentenceSpout()) 指定我们在拓扑中设置的喷口或粗体。 3rd 元素标记了我们为这个特定的 bolt spout 需要的任务数。
然后我们使用 .fieldsGrouping 或 .shuffleGrouping 等来指定分组的类型,然后在括号之间 1st 元素是与 bolt/spout 的连接输入和 2nd(例如 new Fields("word"))决定了我们将分组的字段。
其中一个螺栓的代码内:
public class SplitSentenceBolt extends BaseRichBolt{
private OutputCollector collector;
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple tuple) {
this.collector.emit(a, new Values(word, time, name));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
在 this.collector.emit(a, new Values(word, time, name));
处,a 是 stream_ID,values(...) 是元组的元素。
在 declarer.declare(new Fields("word"));
处,单词必须是先前值之一。 我对之前的一切都正确吗?
所以我的问题是: declarer.declare(new Fields("word"));
word 必须与 word[ 相同=42=] in this.collector.emit(a, new Values(word, time, name));
和 word in builder.setBolt(COUNT_BOLT_ID, new WordCountBolt(), 3).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
一样 ????
您在 declareOutputFields 中声明的字段的 数量和顺序 应与您发出的字段匹配。
我推荐的两个更改:
- 现在通过省略第一个参数使用默认流:
collector.emit(new Values(word, time, name));
- 确保声明相同数量的字段:
declarer.declare(new Fields("word", "time", "name"));
我有一个关于storm 的基本问题。我能清楚地理解一些基本的东西。例如我有一个 main class 里面有这个代码:
...
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SENTENCE_SPOUT_ID, new SentenceSpout());
builder.setBolt(SPLIT_BOLT_ID, new SplitSentenceBolt()).shuffleGrouping(SENTENCE_SPOUT_ID);
builder.setBolt(COUNT_BOLT_ID, new WordCountBolt(), 3).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
builder.setBolt(REPORT_BOLT_ID, new ReportBolt()).globalGrouping(COUNT_BOLT_ID);
...
我知道 1st 元素(例如 "SENTENCE_SPOUT_ID")是 bolt/spout 的 ID,以显示其中 2 个之间的联系. 2nd 元素 (ex.new SentenceSpout()) 指定我们在拓扑中设置的喷口或粗体。 3rd 元素标记了我们为这个特定的 bolt spout 需要的任务数。
然后我们使用 .fieldsGrouping 或 .shuffleGrouping 等来指定分组的类型,然后在括号之间 1st 元素是与 bolt/spout 的连接输入和 2nd(例如 new Fields("word"))决定了我们将分组的字段。
其中一个螺栓的代码内:
public class SplitSentenceBolt extends BaseRichBolt{
private OutputCollector collector;
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple tuple) {
this.collector.emit(a, new Values(word, time, name));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
在 this.collector.emit(a, new Values(word, time, name));
处,a 是 stream_ID,values(...) 是元组的元素。
在 declarer.declare(new Fields("word"));
处,单词必须是先前值之一。 我对之前的一切都正确吗?
所以我的问题是: declarer.declare(new Fields("word"));
word 必须与 word[ 相同=42=] in this.collector.emit(a, new Values(word, time, name));
和 word in builder.setBolt(COUNT_BOLT_ID, new WordCountBolt(), 3).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
一样 ????
您在 declareOutputFields 中声明的字段的 数量和顺序 应与您发出的字段匹配。
我推荐的两个更改:
- 现在通过省略第一个参数使用默认流:
collector.emit(new Values(word, time, name));
- 确保声明相同数量的字段:
declarer.declare(new Fields("word", "time", "name"));