重新启动 Storm 时再次从 Kafka 处理所有预处理的记录
processes all pre processed records again from Kafka when restart Storm
我正在从 Kafka 消费者读取数据到 Storm spout。但是,当我重新启动 Storm 时,它还会从 Kafka 读取以前处理过的记录。
重新启动时,我不想处理以前处理过的记录。
这是我的代码:
public class KafkaStormSample {
public static void main(String[] args) throws Exception {
SpoutConfig kafkaSpoutConfig = new SpoutConfig(hosts, topic, "/" + topic, UUID.randomUUID().toString());
kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutConfig));
builder.setBolt("word-spitter", new SplitBolt()).shuffleGrouping("kafka-spout");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("KafkaStormSample", config, builder.createTopology());
}
}
问题是您为 SpoutConfig 使用的随机 UUID。相反,选择一个固定的字符串并每次都使用它。
无关:您不应该使用 storm-kafka
编写新代码。请改用 storm-kafka-client
。
连同 静态 UUID,您可以使用 StormSubmitter
将拓扑提交到 Storm 集群上的 运行。更多信息 here
我正在从 Kafka 消费者读取数据到 Storm spout。但是,当我重新启动 Storm 时,它还会从 Kafka 读取以前处理过的记录。 重新启动时,我不想处理以前处理过的记录。 这是我的代码:
public class KafkaStormSample {
public static void main(String[] args) throws Exception {
SpoutConfig kafkaSpoutConfig = new SpoutConfig(hosts, topic, "/" + topic, UUID.randomUUID().toString());
kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutConfig));
builder.setBolt("word-spitter", new SplitBolt()).shuffleGrouping("kafka-spout");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("KafkaStormSample", config, builder.createTopology());
}
}
问题是您为 SpoutConfig 使用的随机 UUID。相反,选择一个固定的字符串并每次都使用它。
无关:您不应该使用 storm-kafka
编写新代码。请改用 storm-kafka-client
。
连同 静态 UUID,您可以使用 StormSubmitter
将拓扑提交到 Storm 集群上的 运行。更多信息 here