Storm - 有条件地消耗来自kafka spout的流?
Storm - Conditionally consuming stream from kafka spout?
我有一个场景,我将 json 发布到 Kafka 实例。然后我使用 Kafka Spout 将流发射到螺栓。
但现在我想在我的 json 消息中添加额外的字段(称之为 x
)。如果 x
是 a
我希望它被 boltA 使用,如果 x
是 b
我希望它被 boltB 使用。
有没有办法根据流内容将流定向到正确的螺栓?
最简单的方法应该是添加一个 SplitBolt
从 KafkaSpout
消费,评估字段 x
,并转发到不同的输出流:
public class SplitBolt extends BaseRichBolt {
OutputCollector collector;
public void prepare(...) {
this.collector = collector;
}
public void execute(Tuple input) {
Object x = ... // get field x from input
String streamId;
if(x == a) {
streamId = "stream-xa";
} else { // x == b
streamId = "stream-xb";
}
collector.emit(streamId, input, input.getValues());
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
Fields schema = new Fields(...)
declarer.declareStream("stream-xa", schema);
declarer.declareStream("stream-xy", schema);
}
}
构建拓扑时,您将 BoltA
连接到 "stream-xa" 并将 BoltB
连接到 "stream-xb":
TopologyBuilder b = new TopologyBuilder();
b.setSpout("spout", new KafkaSpout(...));
b.setBolt("split", new SplitBolt()).shuffleGrouping("spout");
b.setBolt("boltA", new BoltA()).shuffleGrouping("split", "stream-xa");
b.setBolt("boltB", new BoltB()).shuffleGrouping("split", "stream-xb");
作为替代方案,也应该可以从 KafkaSpout
继承并直接发送到两个不同的流。但是,代码更难正确。
我有一个场景,我将 json 发布到 Kafka 实例。然后我使用 Kafka Spout 将流发射到螺栓。
但现在我想在我的 json 消息中添加额外的字段(称之为 x
)。如果 x
是 a
我希望它被 boltA 使用,如果 x
是 b
我希望它被 boltB 使用。
有没有办法根据流内容将流定向到正确的螺栓?
最简单的方法应该是添加一个 SplitBolt
从 KafkaSpout
消费,评估字段 x
,并转发到不同的输出流:
public class SplitBolt extends BaseRichBolt {
OutputCollector collector;
public void prepare(...) {
this.collector = collector;
}
public void execute(Tuple input) {
Object x = ... // get field x from input
String streamId;
if(x == a) {
streamId = "stream-xa";
} else { // x == b
streamId = "stream-xb";
}
collector.emit(streamId, input, input.getValues());
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
Fields schema = new Fields(...)
declarer.declareStream("stream-xa", schema);
declarer.declareStream("stream-xy", schema);
}
}
构建拓扑时,您将 BoltA
连接到 "stream-xa" 并将 BoltB
连接到 "stream-xb":
TopologyBuilder b = new TopologyBuilder();
b.setSpout("spout", new KafkaSpout(...));
b.setBolt("split", new SplitBolt()).shuffleGrouping("spout");
b.setBolt("boltA", new BoltA()).shuffleGrouping("split", "stream-xa");
b.setBolt("boltB", new BoltB()).shuffleGrouping("split", "stream-xb");
作为替代方案,也应该可以从 KafkaSpout
继承并直接发送到两个不同的流。但是,代码更难正确。