TopicPartitionOffset 不适用于 SeekPosition.Timestamp
TopicPartitionOffset does not work for SeekPosition.Timestamp
当我想从 Kafka 检索数据时,应该可以从某个时间戳开始这样做。我的印象是,这可以通过使用带有第 4 个参数 SeekPosition.Timestamp 的 TopicPartitionOffset 来实现。但是,这不起作用。当我传递一个有效的 Instant 时(我知道它在有效日期范围内,即 kafka 有带有适当时间戳的记录)出现以下控制台消息:
Seeking to offset 1626779437 for partition secret-partition-0
Fetch offset 1626779437 is out of range for partition secret-partition-0, resetting offset.
Resetting offset for partition secret-partition-0 to offset 450108.
重置发生的偏移量是主题的最后一个偏移量。我用一个叫做 offset explorer 的工具检查了这个。所以它和使用
是一样的
return new TopicPartitionOffset(topic, p, SeekPosition.END);
然而,这仅在没有传递 Instant 时使用,效果很好。
当时间戳未计算到适当的偏移量时,将 TopicPartitionOffset 与时间戳一起使用的目的是什么?是否还需要通过Consumer及其offsetsForTimes方法手动计算offset?
代码有问题还是 TopicPartitionOffset 创建错误?
请参阅附件代码。另请注意,“orElseGet”仅在未传递任何 Instant 时调用,对于当前问题而言并非如此。 “.map”路径 运行 正确并产生一个值。
listenerContainer = contentKafkaListenerContainerFactory.createContainer(
partitions.stream()
.map(p ->
Optional.ofNullable(instant)
.map(i -> {
return new TopicPartitionOffset(topic, p, i.getEpochSecond(), SeekPosition.TIMESTAMP);
})
.orElseGet(() -> {
return new TopicPartitionOffset(topic, p, SeekPosition.BEGINNING);
})
)
.toArray(TopicPartitionOffset[]::new)
);
listenerContainer.setupMessageListener(kafkaMessageListener);
listenerContainer.start();
所以在同样的摆弄之后,这个问题似乎可以通过使用 ConsumerSeekAware 接口来解决。
除了
的用法外,我上面的代码是正确的
return new TopicPartitionOffset(topic, p, i.getEpochSecond(), SeekPosition.TIMESTAMP);
应该是:
return new TopicPartitionOffset(topic, p, i.toEpochMilli(), SeekPosition.TIMESTAMP);
似乎在分配分区时创建 TopicPartitionOffset 不会偏移分区,而只是作为偏移的容器,因此需要手动步骤,应用实际偏移。
public class KafkaTopicOffsetter implements ConsumerSeekAware {
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.forEach((topicPartition, aLong) -> {
callback.seekToTimestamp(topicPartition.topic(), topicPartition.partition(), aLong);
}
);
}
}
如果您使用 ConcurrentKafkaListenerContainerFactory 来创建您的侦听器,这似乎可以解决问题,就像我在最初的问题中所做的那样。
亲切的问候
当我想从 Kafka 检索数据时,应该可以从某个时间戳开始这样做。我的印象是,这可以通过使用带有第 4 个参数 SeekPosition.Timestamp 的 TopicPartitionOffset 来实现。但是,这不起作用。当我传递一个有效的 Instant 时(我知道它在有效日期范围内,即 kafka 有带有适当时间戳的记录)出现以下控制台消息:
Seeking to offset 1626779437 for partition secret-partition-0
Fetch offset 1626779437 is out of range for partition secret-partition-0, resetting offset.
Resetting offset for partition secret-partition-0 to offset 450108.
重置发生的偏移量是主题的最后一个偏移量。我用一个叫做 offset explorer 的工具检查了这个。所以它和使用
return new TopicPartitionOffset(topic, p, SeekPosition.END);
然而,这仅在没有传递 Instant 时使用,效果很好。
当时间戳未计算到适当的偏移量时,将 TopicPartitionOffset 与时间戳一起使用的目的是什么?是否还需要通过Consumer及其offsetsForTimes方法手动计算offset?
代码有问题还是 TopicPartitionOffset 创建错误? 请参阅附件代码。另请注意,“orElseGet”仅在未传递任何 Instant 时调用,对于当前问题而言并非如此。 “.map”路径 运行 正确并产生一个值。
listenerContainer = contentKafkaListenerContainerFactory.createContainer(
partitions.stream()
.map(p ->
Optional.ofNullable(instant)
.map(i -> {
return new TopicPartitionOffset(topic, p, i.getEpochSecond(), SeekPosition.TIMESTAMP);
})
.orElseGet(() -> {
return new TopicPartitionOffset(topic, p, SeekPosition.BEGINNING);
})
)
.toArray(TopicPartitionOffset[]::new)
);
listenerContainer.setupMessageListener(kafkaMessageListener);
listenerContainer.start();
所以在同样的摆弄之后,这个问题似乎可以通过使用 ConsumerSeekAware 接口来解决。 除了
的用法外,我上面的代码是正确的return new TopicPartitionOffset(topic, p, i.getEpochSecond(), SeekPosition.TIMESTAMP);
应该是:
return new TopicPartitionOffset(topic, p, i.toEpochMilli(), SeekPosition.TIMESTAMP);
似乎在分配分区时创建 TopicPartitionOffset 不会偏移分区,而只是作为偏移的容器,因此需要手动步骤,应用实际偏移。
public class KafkaTopicOffsetter implements ConsumerSeekAware {
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.forEach((topicPartition, aLong) -> {
callback.seekToTimestamp(topicPartition.topic(), topicPartition.partition(), aLong);
}
);
}
}
如果您使用 ConcurrentKafkaListenerContainerFactory 来创建您的侦听器,这似乎可以解决问题,就像我在最初的问题中所做的那样。
亲切的问候