为什么我的所有 Kafka 消息都在 Storm 中重播?
Why are all my Kafka messages being replayed in Storm?
我想弄清楚为什么每次我重新启动 Storm 拓扑时我的所有 Kafka 消息都会被重播。
我的理解是,一旦最后一个 Bolt 确认了元组,spout 就应该在 Kafka 上提交消息,因此我不应该在重启后看到它重播。
我的代码是一个简单的 Kafka-spout 和一个 Bolt,它只打印每条消息然后确认它们。
private static KafkaSpout buildKafkaSpout(String topicName) {
ZkHosts zkHosts = new ZkHosts("localhost:2181");
SpoutConfig spoutConfig = new SpoutConfig(zkHosts,
topicName,
"/" + topicName,
"mykafkaspout"); /*was:UUID.randomUUID().toString()*/
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
return new KafkaSpout(spoutConfig);
}
public static class PrintBolt extends BaseRichBolt {
OutputCollector _collector;
public static Logger LOG = LoggerFactory.getLogger(PrintBolt.class);
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
LOG.error("PrintBolt.0: {}",tuple.getString(0));
_collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("nothing"));
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka", buildKafkaSpout("mytopic"), 1);
builder.setBolt("print1", new PrintBolt(),1).shuffleGrouping("kafka");
}
除了代码中的设置,我没有提供任何配置设置。
我是否缺少配置设置或我做错了什么?
更新:
澄清一下,在我重新启动管道之前一切正常。以下行为是我在其他(非风暴)消费者中可以获得的,以及我对 KafkaSpout
的期望
我的期望:
然而,我使用默认设置的实际行为如下。消息处理得很好,直到我停止管道,然后当我重新启动时,我得到所有消息的重播,包括那些我认为我已经确认的消息(A 和 B)
实际情况:
根据 Matthias 提到的 configuration options,我可以将 startOffsetTime
更改为 Latest
,但这实际上是管道丢弃消息的最新位置(消息 "C") 是在管道重新启动时生成的。
我有一个用 NodeJS 编写的消费者(使用 npm kafka-node),它能够向 Kafka 确认消息,当我重新启动 NodeJs 消费者时,它完全符合我的预期(赶上消息 "C"是在消费者情绪低落并从那里继续时产生的)——那么我如何使用 KafkaSpout 获得相同的行为?
问题出在提交代码中——如果 storm jar
是 运行 而没有拓扑名称,提交拓扑的模板代码将创建 LocalCluster
的实例,并且本地集群不捕获状态,因此不捕获重放。
所以
$ storm jar myjar.jar storm.myorg.MyTopology topologyname
将在我的 single node development cluster 上启动它,其中
$ storm jar myjar.jar storm.myorg.MyTopology
将在 LocalCluster
的实例上启动它
我想弄清楚为什么每次我重新启动 Storm 拓扑时我的所有 Kafka 消息都会被重播。
我的理解是,一旦最后一个 Bolt 确认了元组,spout 就应该在 Kafka 上提交消息,因此我不应该在重启后看到它重播。
我的代码是一个简单的 Kafka-spout 和一个 Bolt,它只打印每条消息然后确认它们。
private static KafkaSpout buildKafkaSpout(String topicName) {
ZkHosts zkHosts = new ZkHosts("localhost:2181");
SpoutConfig spoutConfig = new SpoutConfig(zkHosts,
topicName,
"/" + topicName,
"mykafkaspout"); /*was:UUID.randomUUID().toString()*/
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
return new KafkaSpout(spoutConfig);
}
public static class PrintBolt extends BaseRichBolt {
OutputCollector _collector;
public static Logger LOG = LoggerFactory.getLogger(PrintBolt.class);
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
LOG.error("PrintBolt.0: {}",tuple.getString(0));
_collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("nothing"));
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka", buildKafkaSpout("mytopic"), 1);
builder.setBolt("print1", new PrintBolt(),1).shuffleGrouping("kafka");
}
除了代码中的设置,我没有提供任何配置设置。
我是否缺少配置设置或我做错了什么?
更新:
澄清一下,在我重新启动管道之前一切正常。以下行为是我在其他(非风暴)消费者中可以获得的,以及我对 KafkaSpout
的期望我的期望:
然而,我使用默认设置的实际行为如下。消息处理得很好,直到我停止管道,然后当我重新启动时,我得到所有消息的重播,包括那些我认为我已经确认的消息(A 和 B)
实际情况:
根据 Matthias 提到的 configuration options,我可以将 startOffsetTime
更改为 Latest
,但这实际上是管道丢弃消息的最新位置(消息 "C") 是在管道重新启动时生成的。
我有一个用 NodeJS 编写的消费者(使用 npm kafka-node),它能够向 Kafka 确认消息,当我重新启动 NodeJs 消费者时,它完全符合我的预期(赶上消息 "C"是在消费者情绪低落并从那里继续时产生的)——那么我如何使用 KafkaSpout 获得相同的行为?
问题出在提交代码中——如果 storm jar
是 运行 而没有拓扑名称,提交拓扑的模板代码将创建 LocalCluster
的实例,并且本地集群不捕获状态,因此不捕获重放。
所以
$ storm jar myjar.jar storm.myorg.MyTopology topologyname
将在我的 single node development cluster 上启动它,其中
$ storm jar myjar.jar storm.myorg.MyTopology
将在 LocalCluster