为什么要为 Kafka 使用我们自己的 Offset Repository
Why to use our own Offset Repository for Kafka
我目前正在尝试为一个项目配置Kafka。我对偏移存储库有点困惑。我知道这两个开箱即用的选项存储在 Kafka 或 Zookeeper 中,我还发现还有另一个选项(至少如果我们将 kafka 与 Apache camel 结合使用)我也可以在任何地方定义和初始化我自己的存储库文件系统。
我的问题是:是否存在存储在Kafka或Zookeeper中的偏移量存储库不持久或不一致的情况?我最关心的是系统的任何部分(消费者、生产者、kafka 实例或 zookeeper 实例)意外关闭的情况。重新启动这些组件后,消费者是否会获得停止从主题读取的偏移量?
共享 volume/storage 上的显式存储库是否有意义?
如果我的问题有任何不清楚的地方,请告诉我。
您可以将偏移量存储在应用程序附近并设置偏移量值以开始使用来自 Kafka 的记录。这完全取决于你想做什么。
is there any scenario where offset repository stored in Kafka or Zookeeper is not persistent or consistent?
一时想不起来了。
I am mostly concerned about the situation where any part of the system (consumer, producer, kafka instance or zookeeper instance) shuts down unexpectedly. Once those components are restarted, will the consumer get the offset where it stopped reading from the topic?
这取决于您的应用程序的配置。如果您的应用程序在无法连接到 Kafka 时崩溃,并且该应用程序配置为从 EARLIEST
开始使用。您将开始使用有关该主题的最早记录。
如果您使用 LATEST
,它将从最后一次读取(并提交)的偏移量重新开始。
如果您想跳到主题中保存的最新记录,您需要告诉您的消费者..
#example
//seek to end of the topic
consumer.seekToEnd();
请在 Confluent 网站和 Apache Kafka 网站上找到更多详细信息。
我目前正在尝试为一个项目配置Kafka。我对偏移存储库有点困惑。我知道这两个开箱即用的选项存储在 Kafka 或 Zookeeper 中,我还发现还有另一个选项(至少如果我们将 kafka 与 Apache camel 结合使用)我也可以在任何地方定义和初始化我自己的存储库文件系统。
我的问题是:是否存在存储在Kafka或Zookeeper中的偏移量存储库不持久或不一致的情况?我最关心的是系统的任何部分(消费者、生产者、kafka 实例或 zookeeper 实例)意外关闭的情况。重新启动这些组件后,消费者是否会获得停止从主题读取的偏移量?
共享 volume/storage 上的显式存储库是否有意义?
如果我的问题有任何不清楚的地方,请告诉我。
您可以将偏移量存储在应用程序附近并设置偏移量值以开始使用来自 Kafka 的记录。这完全取决于你想做什么。
is there any scenario where offset repository stored in Kafka or Zookeeper is not persistent or consistent?
一时想不起来了。
I am mostly concerned about the situation where any part of the system (consumer, producer, kafka instance or zookeeper instance) shuts down unexpectedly. Once those components are restarted, will the consumer get the offset where it stopped reading from the topic?
这取决于您的应用程序的配置。如果您的应用程序在无法连接到 Kafka 时崩溃,并且该应用程序配置为从 EARLIEST
开始使用。您将开始使用有关该主题的最早记录。
如果您使用 LATEST
,它将从最后一次读取(并提交)的偏移量重新开始。
如果您想跳到主题中保存的最新记录,您需要告诉您的消费者..
#example
//seek to end of the topic
consumer.seekToEnd();
请在 Confluent 网站和 Apache Kafka 网站上找到更多详细信息。