带有 Kafka 偏移量管理的 Apache Storm
Apache Storm with Kafka offset management
我已经使用 Kafka 作为源使用 Storm 构建了一个示例拓扑。这是我需要解决的问题。
每次我杀死一个拓扑并重新启动它时,拓扑都会从头开始处理。
假设 Topic X 中的消息 A 由 Topology 处理,然后我 kill topology。
现在再次提交topology,Message A还在,还有Topic X,又处理了。
是否有解决方案,也许是某种偏移量管理来处理这种情况。
确保在创建 spoutconfig 时它有一个固定的 spout id,它可以在重启后识别自己。
来自 Storm 官方网站:
Important: When re-deploying a topology make sure that the settings
for SpoutConfig.zkRoot and SpoutConfig.id were not modified, otherwise
the spout will not be able to read its previous consumer state
information (i.e. the offsets) from ZooKeeper -- which may lead to
unexpected behavior and/or to data loss, depending on your use case.
您不应该将 storm-kafka
用于新代码,它已被弃用,因为底层客户端 API 在 Kafka 中已被弃用,并从 2.0.0 开始被删除。相反,使用 storm-kafka-client
.
对于 storm-kafka-client
你想设置一个组 ID 和第一个轮询偏移策略。
KafkaSpoutConfig.builder(bootstrapServers, "your-topic")
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
.setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
.build();
上面的代码会让你的 spout 在你第一次启动时从最早的偏移量开始,然后如果你重新启动它,它会从它停止的地方开始。 Kafka 在重新启动时使用 group id 来识别 spout,因此它可以取回存储的偏移量检查点。其他偏移策略的行为会有所不同,您可以查看 javadoc 中的 FirstPollOffsetStrategy 枚举。
spout 会定期检查它到达的距离,配置中也有一个设置来控制它。检查点由配置中的 setProcessingGuarantee
设置控制,并且可以设置为至少一次(仅检查点确认偏移量)、最多一次(spout 发出消息之前的检查点)和 "any times"(定期检查点,忽略确认)。
我也遇到了类似的问题,趁着这个问题问一下。我有这样的代码:
KafkaTridentSpoutConfig.Builder kafkaSpoutConfigBuilder = KafkaTridentSpoutConfig.builder(bootstrapServers, topic);
kafkaSpoutConfigBuilder.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, fetchSizeBytes);
kafkaSpoutConfigBuilder.setProp(ConsumerConfig.GROUP_ID_CONFIG, clientId);
kafkaSpoutConfigBuilder.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
kafkaSpoutConfigBuilder.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaTridentSpoutOpaque(kafkaSpoutConfigBuilder.build());
但是每次我重新启动 Storm Local Cluster 时,消息都会从头开始读取。如果我直接在 Kafka 中检查特定组的偏移量,则没有延迟。就像没有读取 Kafka 的偏移量一样。
使用 Kafka 2.8,Storm 2.2.0。我在 Storm 0.9.X 中没有这个问题。
有什么想法吗?
谢谢!
我已经使用 Kafka 作为源使用 Storm 构建了一个示例拓扑。这是我需要解决的问题。
每次我杀死一个拓扑并重新启动它时,拓扑都会从头开始处理。
假设 Topic X 中的消息 A 由 Topology 处理,然后我 kill topology。
现在再次提交topology,Message A还在,还有Topic X,又处理了。
是否有解决方案,也许是某种偏移量管理来处理这种情况。
确保在创建 spoutconfig 时它有一个固定的 spout id,它可以在重启后识别自己。
来自 Storm 官方网站:
Important: When re-deploying a topology make sure that the settings for SpoutConfig.zkRoot and SpoutConfig.id were not modified, otherwise the spout will not be able to read its previous consumer state information (i.e. the offsets) from ZooKeeper -- which may lead to unexpected behavior and/or to data loss, depending on your use case.
您不应该将 storm-kafka
用于新代码,它已被弃用,因为底层客户端 API 在 Kafka 中已被弃用,并从 2.0.0 开始被删除。相反,使用 storm-kafka-client
.
对于 storm-kafka-client
你想设置一个组 ID 和第一个轮询偏移策略。
KafkaSpoutConfig.builder(bootstrapServers, "your-topic")
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
.setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
.build();
上面的代码会让你的 spout 在你第一次启动时从最早的偏移量开始,然后如果你重新启动它,它会从它停止的地方开始。 Kafka 在重新启动时使用 group id 来识别 spout,因此它可以取回存储的偏移量检查点。其他偏移策略的行为会有所不同,您可以查看 javadoc 中的 FirstPollOffsetStrategy 枚举。
spout 会定期检查它到达的距离,配置中也有一个设置来控制它。检查点由配置中的 setProcessingGuarantee
设置控制,并且可以设置为至少一次(仅检查点确认偏移量)、最多一次(spout 发出消息之前的检查点)和 "any times"(定期检查点,忽略确认)。
我也遇到了类似的问题,趁着这个问题问一下。我有这样的代码:
KafkaTridentSpoutConfig.Builder kafkaSpoutConfigBuilder = KafkaTridentSpoutConfig.builder(bootstrapServers, topic);
kafkaSpoutConfigBuilder.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, fetchSizeBytes);
kafkaSpoutConfigBuilder.setProp(ConsumerConfig.GROUP_ID_CONFIG, clientId);
kafkaSpoutConfigBuilder.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
kafkaSpoutConfigBuilder.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaTridentSpoutOpaque(kafkaSpoutConfigBuilder.build());
但是每次我重新启动 Storm Local Cluster 时,消息都会从头开始读取。如果我直接在 Kafka 中检查特定组的偏移量,则没有延迟。就像没有读取 Kafka 的偏移量一样。 使用 Kafka 2.8,Storm 2.2.0。我在 Storm 0.9.X 中没有这个问题。 有什么想法吗?
谢谢!