如何找出 Kafka 主题的最新偏移量以了解我的 reader 何时与主题保持同步?
How to find out the latest offset of a Kafka topic to know when my reader is up-to-date with topic?
我有一台服务器需要保留所有用户的内存缓存。因此,假设列表不会很大 - 几十万个项目,我想使用带有键控消息的 Kafka 主题,其中键是 userId 以保持该列表的当前状态,管理应用程序将发送新的用户对象当事情发生变化时转到那个话题。因此,当服务器启动时,它只需要从头开始读取该主题的所有内容并填充它的缓存。
填充阶段大约需要 20-30 秒,具体取决于与 Kafka 的连接,因此服务器不需要在线,直到它从主题中读取所有内容以获得最新的缓存(主题中的所有消息在开始的那一刻被认为是最新的)。但是我不知道如何确定我是否从 Kafka 流中读取了所有内容以通知其他服务缓存已填充并且服务器可以启动服务器请求。我读过有关高水位线的信息,但未在 Java 消费者 API 中看到它。
那么如何找出 Kafka 主题的最新偏移量以了解我的 reader 何时是最新的?
假设您使用的是高级消费者。
High watermark 在 High level consumer 中不可用。
**As you mentioned: all the messages in the topic at the moment of start is considered up-to-date**
当您的应用程序启动时,您可以使用 SimpleConsumer Api 执行以下操作:-
通过向kafka集群中的任意broker发出TopicMetadataRequest来查找topic中的分区数
创建分区到 latestOffset 映射,其中键是分区,值是该分区中可用的 latestOffset。
Map offsetMap = new HashMap<>()
对于主题中的每个分区 p:
一个。找到分区 p
的领导者
乙。向领导者发送 OffsetRequest
C。从 OffsetResponse
获取最新的 Offset
D.向 offsetMap 添加一个条目,其中键是分区 p,偏移量是
latestOffset.
开始使用高级消费者从kafka读取消息:
一个。对于从 KafkaStream 收到的每条消息:
AA. Get the partition && offset of the message
BB. if( offsetMap.get(partition)<=offset) stop Reading from this steam
希望对您有所帮助。
我有一台服务器需要保留所有用户的内存缓存。因此,假设列表不会很大 - 几十万个项目,我想使用带有键控消息的 Kafka 主题,其中键是 userId 以保持该列表的当前状态,管理应用程序将发送新的用户对象当事情发生变化时转到那个话题。因此,当服务器启动时,它只需要从头开始读取该主题的所有内容并填充它的缓存。
填充阶段大约需要 20-30 秒,具体取决于与 Kafka 的连接,因此服务器不需要在线,直到它从主题中读取所有内容以获得最新的缓存(主题中的所有消息在开始的那一刻被认为是最新的)。但是我不知道如何确定我是否从 Kafka 流中读取了所有内容以通知其他服务缓存已填充并且服务器可以启动服务器请求。我读过有关高水位线的信息,但未在 Java 消费者 API 中看到它。
那么如何找出 Kafka 主题的最新偏移量以了解我的 reader 何时是最新的?
假设您使用的是高级消费者。
High watermark 在 High level consumer 中不可用。
**As you mentioned: all the messages in the topic at the moment of start is considered up-to-date**
当您的应用程序启动时,您可以使用 SimpleConsumer Api 执行以下操作:-
通过向kafka集群中的任意broker发出TopicMetadataRequest来查找topic中的分区数
创建分区到 latestOffset 映射,其中键是分区,值是该分区中可用的 latestOffset。
Map
offsetMap = new HashMap<>() 对于主题中的每个分区 p:
一个。找到分区 p
的领导者乙。向领导者发送 OffsetRequest
C。从 OffsetResponse
获取最新的 OffsetD.向 offsetMap 添加一个条目,其中键是分区 p,偏移量是 latestOffset.
开始使用高级消费者从kafka读取消息:
一个。对于从 KafkaStream 收到的每条消息:
AA. Get the partition && offset of the message BB. if( offsetMap.get(partition)<=offset) stop Reading from this steam
希望对您有所帮助。