如何向生产者请求Kafka中不存在的起始位置的数据?

How to request data from producer at beginning position that does not exist in Kafka?

我有一个包含时间序列数据的数据库,并将此数据发送到 Kafka。

许多消费者基于这些数据构建聚合和报告。 我的 Kafka 集群以 TTL 存储数据 1 天。

但是我如何构建一个新的报告和 运行 从第 0 个位置开始的新消费者,它在 Kafka 中不存在但在源存储中存在。

例如 - 如果我请求 Kafka 中不存在的偏移量,生产者的一些回调?

如果不可能,请告知其他架构解决方案。我想使用相同的代码库来聚合这些数据。

For example - some callback for the producer if I request an offset that does not exist in Kafka?

如果 Kafka 中不存在数据,您将无法使用它,更不用说在它上面做任何聚合了。

此外,没有消费者请求生产者的概念。生产者将数据发送给 Kafka 经纪人,消费者从这些经纪人那里消费。生产者和消费者之间没有直接交互。


既然你说数据仍然存在于源数据库中,那么你可以从那里获取数据并将其复制到 Kafka。

当您再次生成该数据时,它们将成为新消息,最终将像往常一样被消费者使用。 如果您想区分初始消费和 re-consumption,您可以将这些消息生成到一个新主题并让您的消费者从中消费。

其他方法是增加你的 TTL(我想你说的 TTL 是指 Kafka 中的 retention),然后你可以返回到 timestamp 在消费者中使用 offsetsForTimes(Map<TopicPartition,Long> timestampToSearch)seek(TopicPartition topicPartition, long offset) 方法。