TopicPartitionOffset 不适用于 SeekPosition.Timestamp

TopicPartitionOffset does not work for SeekPosition.Timestamp

当我想从 Kafka 检索数据时,应该可以从某个时间戳开始这样做。我的印象是,这可以通过使用带有第 4 个参数 SeekPosition.TimestampTopicPartitionOffset 来实现。但是,这不起作用。当我传递一个有效的 Instant 时(我知道它在有效日期范围内,即 kafka 有带有适当时间戳的记录)出现以下控制台消息:

Seeking to offset 1626779437 for partition secret-partition-0

Fetch offset 1626779437 is out of range for partition secret-partition-0, resetting offset.

Resetting offset for partition secret-partition-0 to offset 450108. 重置发生的偏移量是主题的最后一个偏移量。我用一个叫做 offset explorer 的工具检查了这个。所以它和使用

是一样的

return new TopicPartitionOffset(topic, p, SeekPosition.END);

然而,这仅在没有传递 Instant 时使用,效果很好。

当时间戳未计算到适当的偏移量时,将 TopicPartitionOffset 与时间戳一起使用的目的是什么?是否还需要通过Consumer及其offsetsForTimes方法手动计算offset?

代码有问题还是 TopicPartitionOffset 创建错误? 请参阅附件代码。另请注意,“orElseGet”仅在未传递任何 Instant 时调用,对于当前问题而言并非如此。 “.map”路径 运行 正确并产生一个值。

listenerContainer = contentKafkaListenerContainerFactory.createContainer(
                partitions.stream()
                    .map(p ->
                        Optional.ofNullable(instant)
                                .map(i -> {
                                    return new TopicPartitionOffset(topic, p, i.getEpochSecond(), SeekPosition.TIMESTAMP);
                                })

                                .orElseGet(() -> {
                                    return new TopicPartitionOffset(topic, p, SeekPosition.BEGINNING);                  
                                })
                    )
                    .toArray(TopicPartitionOffset[]::new)
        );

        listenerContainer.setupMessageListener(kafkaMessageListener);
        listenerContainer.start();

所以在同样的摆弄之后,这个问题似乎可以通过使用 ConsumerSeekAware 接口来解决。 除了

的用法外,我上面的代码是正确的
return new TopicPartitionOffset(topic, p, i.getEpochSecond(), SeekPosition.TIMESTAMP);

应该是:

return new TopicPartitionOffset(topic, p, i.toEpochMilli(), SeekPosition.TIMESTAMP);

似乎在分配分区时创建 TopicPartitionOffset 不会偏移分区,而只是作为偏移的容器,因此需要手动步骤,应用实际偏移。

public class KafkaTopicOffsetter implements ConsumerSeekAware {
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        assignments.forEach((topicPartition, aLong) -> {
                    callback.seekToTimestamp(topicPartition.topic(), topicPartition.partition(), aLong);
                }
        );
    }
}

如果您使用 ConcurrentKafkaListenerContainerFactory 来创建您的侦听器,这似乎可以解决问题,就像我在最初的问题中所做的那样。

亲切的问候