检查风暴消费者的 Kafka 主题的偏移量

Checking Offset of Kafka topic for a storm consumer

我正在使用 storm-kafka-client 1.2.1 并为 KafkaTridentSpoutOpaque 创建我的 spout 配置,如下所示

            kafkaSpoutConfig = KafkaSpoutConfig.builder(brokerURL, kafkaTopic)
                .setProp(ConsumerConfig.GROUP_ID_CONFIG,"storm-kafka-group")
                .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE)
                .setProp(ConsumerConfig.CLIENT_ID_CONFIG,InetAddress.getLocalHost().getHostName())

我在 Kafka 和 Zookeeper 中都找不到我的组 ID 和偏移量。通过 Zookeeper 我尝试了 zkCli.sh 并尝试了 ls /consumers 但有 none 因为我认为 Kafka 本身现在正在维护偏移量而不是 zookeeper。

我也用下面的命令尝试了 Kafka

bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand  --list  --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.
console-consumer-20130
console-consumer-82696
console-consumer-6106
console-consumer-67393
console-consumer-14333
console-consumer-21174
console-consumer-64550

有人能帮我找到我的偏移量吗?如果我重新启动拓扑,它会再次在 Kafka 中重播我的事件吗?

Trident 不在 Kafka 中存储偏移量,而是在 Storm 的 Zookeeper 中。如果您是 运行 Storm 的 Zookeeper 配置的默认设置,Storm 的 Zookeeper 中的路径将类似于 /coordinator/<your-topology-id>/meta

该路径下的对象将包含第一个和最后一个偏移量,以及每个批次的主题分区。所以例如/coordinator/<your-topology-id>/meta/15 将包含批号 15 中发出的第一个和最后一个偏移量。

spout 是否在重启后重播偏移由您在 KafkaSpoutConfig 中设置的 FirstPollOffsetStrategy 控制。默认值为 UNCOMMITTED_EARLIEST,重启时不会重新开始。请参阅 https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L126.

处的 Javadoc