Storm - 有条件地消耗来自kafka spout的流?

Storm - Conditionally consuming stream from kafka spout?

我有一个场景,我将 json 发布到 Kafka 实例。然后我使用 Kafka Spout 将流发射到螺栓。

但现在我想在我的 json 消息中添加额外的字段(称之为 x)。如果 xa 我希望它被 boltA 使用,如果 xb 我希望它被 boltB 使用。

有没有办法根据流内容将流定向到正确的螺栓?

最简单的方法应该是添加一个 SplitBoltKafkaSpout 消费,评估字段 x ,并转发到不同的输出流:

public class SplitBolt extends BaseRichBolt {
  OutputCollector collector;

  public void prepare(...) {
    this.collector = collector;
  }

  public void execute(Tuple input) {
    Object x = ... // get field x from input
    String streamId;
    if(x == a) {
      streamId = "stream-xa";
    } else { // x == b
      streamId = "stream-xb";
    }
    collector.emit(streamId, input, input.getValues());
  }

  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    Fields schema = new Fields(...)
    declarer.declareStream("stream-xa", schema);
    declarer.declareStream("stream-xy", schema);
  }
}

构建拓扑时,您将 BoltA 连接到 "stream-xa" 并将 BoltB 连接到 "stream-xb":

TopologyBuilder b = new TopologyBuilder();
b.setSpout("spout", new KafkaSpout(...));
b.setBolt("split", new SplitBolt()).shuffleGrouping("spout");
b.setBolt("boltA", new BoltA()).shuffleGrouping("split", "stream-xa");
b.setBolt("boltB", new BoltB()).shuffleGrouping("split", "stream-xb");

作为替代方案,也应该可以从 KafkaSpout 继承并直接发送到两个不同的流。但是,代码更难正确。