Spring Kafka监听所有topic并调整partition offsets
Spring Kafka Listening to all topics and adjusting partition offsets
根据 spring-kafka 上的文档,我正在使用基于注释的 @KafkaListener 来配置我的消费者。
我看到的是-
除非我将偏移量指定为零,否则在启动时,Kafka 消费者会选择未来的消息而不是现有的消息。 (我知道这是预期的结果,因为我没有指定我想要的偏移量)
我在文档中看到一个选项,用于指定主题 + 分区组合以及零偏移量,但如果我这样做 - 我必须明确指定我想要我的消费者的主题听.
使用上面的方法 2,这就是我的消费者现在的样子 -
@KafkaListener(id = "{group.id}",
topicPartitions = {
@TopicPartition(topic = "${kafka.topic.name}",
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))
},
containerFactory = "kafkaListenerContainerFactory")
public void listen(@Payload String payload,
Acknowledgment ack) throws InterruptedException, IOException {
logger.debug("This is what we received in the Kafka Consumer = " + payload);
idService.process(payload);
ack.acknowledge();
}
虽然我知道有一个选项可以指定 "topicPattern" 通配符或 "topics" 列表作为注释配置的一部分,但我看不到可以提供的地方列出的主题/主题模式从零开始的偏移值。有没有办法将两者结合起来?请指教
当使用主题和主题模式(而不是显式声明分区)时,Kafka 决定哪个消费者实例将获得哪个分区。
Kafka 将分配分区,初始偏移量将是该组 ID 的最后提交。您目前无法更改该偏移量,但我们正在考虑添加 seek function.
如果您总是想从第一个可用偏移量开始,请使用唯一的组 ID(例如 UUID.randomUUID().toString()
)并设置
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
由于 Kafka 没有针对该组 ID 的现有偏移量,它将使用 属性 来确定从哪里开始。
您也可以使用 MANUAL ack 模式而不是 ack,这将有效地做同样的事情。
根据 spring-kafka 上的文档,我正在使用基于注释的 @KafkaListener 来配置我的消费者。
我看到的是-
除非我将偏移量指定为零,否则在启动时,Kafka 消费者会选择未来的消息而不是现有的消息。 (我知道这是预期的结果,因为我没有指定我想要的偏移量)
我在文档中看到一个选项,用于指定主题 + 分区组合以及零偏移量,但如果我这样做 - 我必须明确指定我想要我的消费者的主题听.
使用上面的方法 2,这就是我的消费者现在的样子 -
@KafkaListener(id = "{group.id}",
topicPartitions = {
@TopicPartition(topic = "${kafka.topic.name}",
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))
},
containerFactory = "kafkaListenerContainerFactory")
public void listen(@Payload String payload,
Acknowledgment ack) throws InterruptedException, IOException {
logger.debug("This is what we received in the Kafka Consumer = " + payload);
idService.process(payload);
ack.acknowledge();
}
虽然我知道有一个选项可以指定 "topicPattern" 通配符或 "topics" 列表作为注释配置的一部分,但我看不到可以提供的地方列出的主题/主题模式从零开始的偏移值。有没有办法将两者结合起来?请指教
当使用主题和主题模式(而不是显式声明分区)时,Kafka 决定哪个消费者实例将获得哪个分区。
Kafka 将分配分区,初始偏移量将是该组 ID 的最后提交。您目前无法更改该偏移量,但我们正在考虑添加 seek function.
如果您总是想从第一个可用偏移量开始,请使用唯一的组 ID(例如 UUID.randomUUID().toString()
)并设置
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
由于 Kafka 没有针对该组 ID 的现有偏移量,它将使用 属性 来确定从哪里开始。
您也可以使用 MANUAL ack 模式而不是 ack,这将有效地做同样的事情。