kafka spout 不发射数据

kafka spout is not emitting data

我正在尝试将 Kafka 与 storm.I 集成我正在使用 Kafka Spout 从 Kafka 主题中检索数据并将其提供给风暴螺栓以进一步 processing.I 我能够成功提交拓扑但是spout 没有发出任何 data.It 也不会抛出任何错误。我对 Kafka 和 Storm.So 很陌生,我无法理解这背后的原因 problem.Please 建议提前 modifications.Thanks !!

我的拓扑:

public class TopologyMain {

 private static final String SENTENCE_SPOUT_ID = "kafka-sentence-spout";


public static void main(String[] args) throws InterruptedException, AlreadyAliveException, InvalidTopologyException {
    int numSpoutExecutors = 1;


    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout(SENTENCE_SPOUT_ID, buildKafkaSentenceSpout(), numSpoutExecutors);
    builder.setBolt("word-normalizer", new WordNormalizer())
        .shuffleGrouping(SENTENCE_SPOUT_ID);
    builder.setBolt("word-counter", new WordCounter(),2)
        .shuffleGrouping("word-normalizer");

    //Configuration
    Config conf = new Config();
    conf.setDebug(false);
    //Topology run
    conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
    conf.put(Config.NIMBUS_HOST, "192.168.1.229");
    conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
    System.setProperty("storm.jar", "/home/ubuntu/st/stIn/target/storm-wc.jar");
    StormSubmitter.submitTopology("Count-Word-Topology", conf,builder.createTopology());

}



 private static KafkaSpout buildKafkaSentenceSpout() {
      BrokerHosts hosts = new ZkHosts("localhost:2181");
      SpoutConfig spoutConfig = new SpoutConfig(hosts, "test", "/acking-kafka-sentence-spout", "acking-sentence-spout");
      spoutConfig.forceFromStart = true;
      spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
      return new KafkaSpout(spoutConfig);
    }
 }

我明确地将我项目的 Maven 依赖项中的所有 jar 复制到风暴库并且一切正常fine.Also我将风暴 jar(用于提交拓扑的 jar)复制到 storm/lib.