如何确定主题已被 Kafka Stream 应用程序从 Java 应用程序的第一个偏移量到最后一个偏移量完全读取
How to determine topic has been read completely by Kafka Stream application from very first offset to last offset from Java application
我需要 Kafka Streams 方面的帮助。我已经启动了一个 Kafka 流应用程序,它从第一个偏移量开始流式传输一个主题。 Topic 的数据非常庞大,所以我想在我的应用程序中实现一种机制,使用 Kafka 流,以便在 topic 被完全读取到最后一个偏移量时我可以得到通知。
我已经阅读了 Kafka Streams 2.8.0 api,我找到了一个 api 方法,即 allLocalStorePartitionLags,它将商店名称映射返回到另一个包含所有滞后信息的分区映射每个分区。此方法 returns 滞后此流本地的所有存储分区(活动或备用)的信息。这种方法对我来说非常有用,在上面的例子中,当我有一个节点运行那个流应用程序时。
但在我的例子中,系统是分布式的,应用程序节点是 3 个,主题分区是 10 个,这意味着每个节点至少有 3 个分区供主题读取。
我需要帮助。我如何实现此功能,当主题已从分区 0 完全读取到分区 9 时,我可以收到通知。请注意,到目前为止,我无法选择在此处使用数据库。
也欢迎其他实现目标的方法。谢谢。
我能够从 adminClient api 获取滞后信息。下面的代码结果针对给定流应用程序即 applicationId.
读取的主题,每个分区的结束偏移量和当前偏移量
AdminClient adminClient = AdminClient.create(kafkaProperties);
ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = adminClient.listConsumerGroupOffsets(applicationId);
// Current offsets.
Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap = listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get();
// all topic partitions.
Set<TopicPartition> topicPartitions = topicPartitionOffsetAndMetadataMap.keySet();
// list of end offsets for each partitions.
ListOffsetsResult listOffsetsResult = adminClient.listOffsets(topicPartitions.stream()
.collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest())));
我需要 Kafka Streams 方面的帮助。我已经启动了一个 Kafka 流应用程序,它从第一个偏移量开始流式传输一个主题。 Topic 的数据非常庞大,所以我想在我的应用程序中实现一种机制,使用 Kafka 流,以便在 topic 被完全读取到最后一个偏移量时我可以得到通知。
我已经阅读了 Kafka Streams 2.8.0 api,我找到了一个 api 方法,即 allLocalStorePartitionLags,它将商店名称映射返回到另一个包含所有滞后信息的分区映射每个分区。此方法 returns 滞后此流本地的所有存储分区(活动或备用)的信息。这种方法对我来说非常有用,在上面的例子中,当我有一个节点运行那个流应用程序时。
但在我的例子中,系统是分布式的,应用程序节点是 3 个,主题分区是 10 个,这意味着每个节点至少有 3 个分区供主题读取。
我需要帮助。我如何实现此功能,当主题已从分区 0 完全读取到分区 9 时,我可以收到通知。请注意,到目前为止,我无法选择在此处使用数据库。
也欢迎其他实现目标的方法。谢谢。
我能够从 adminClient api 获取滞后信息。下面的代码结果针对给定流应用程序即 applicationId.
读取的主题,每个分区的结束偏移量和当前偏移量AdminClient adminClient = AdminClient.create(kafkaProperties);
ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = adminClient.listConsumerGroupOffsets(applicationId);
// Current offsets.
Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap = listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get();
// all topic partitions.
Set<TopicPartition> topicPartitions = topicPartitionOffsetAndMetadataMap.keySet();
// list of end offsets for each partitions.
ListOffsetsResult listOffsetsResult = adminClient.listOffsets(topicPartitions.stream()
.collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest())));