重新启动 Storm 时再次从 Kafka 处理所有预处理的记录

processes all pre processed records again from Kafka when restart Storm

我正在从 Kafka 消费者读取数据到 Storm spout。但是,当我重新启动 Storm 时,它还会从 Kafka 读取以前处理过的记录。 重新启动时,我不想处理以前处理过的记录。 这是我的代码:

public class KafkaStormSample {
    public static void main(String[] args) throws Exception {

        SpoutConfig kafkaSpoutConfig = new SpoutConfig(hosts, topic, "/" + topic, UUID.randomUUID().toString());
        kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutConfig));
        builder.setBolt("word-spitter", new SplitBolt()).shuffleGrouping("kafka-spout");

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("KafkaStormSample", config, builder.createTopology());
    }
}

问题是您为 SpoutConfig 使用的随机 UUID。相反,选择一个固定的字符串并每次都使用它。

无关:您不应该使用 storm-kafka 编写新代码。请改用 storm-kafka-client

连同 静态 UUID,您可以使用 StormSubmitter 将拓扑提交到 Storm 集群上的 运行。更多信息 here