如何始终从 kafka-streams 中的最新偏移量消费
How to always consume from latest offset in kafka-streams
我们的要求是,如果 kafka-stream 应用正在使用分区,它应该从该分区的最新偏移量开始使用。
这似乎可以使用
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
现在,假设使用上述配置,kafka-stream 应用程序开始使用分区最新偏移量的数据。一段时间后,应用程序崩溃了。当应用程序重新上线时,我们希望它使用该分区的最新偏移量的数据,而不是它上次读取的位置。
但是我找不到任何可以帮助使用 kafka-streams 实现它的东西 api。
P.S。我们正在使用 kafka-1.0.0.
开箱即用不支持。
配置 auto.offset.reset
仅在没有提交的偏移量且没有配置更改此行为时触发。
您可以在启动前手动操作偏移量
尽管使用 bin/kafka-consumer-groups.sh
——application.id
是
group.id
并且您可以 "seek to end" 在重新启动应用程序之前。
更新:
自 1.1.0 版本开始,您可以使用 bin/kafka-streams-application-reset.sh
工具来设置起始偏移量。要使用该工具,应用程序必须处于离线状态。 (比照:https://cwiki.apache.org/confluence/display/KAFKA/KIP-171+-+Extend+Consumer+Group+Reset+Offset+for+Stream+Application)
我们的要求是,如果 kafka-stream 应用正在使用分区,它应该从该分区的最新偏移量开始使用。
这似乎可以使用
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
现在,假设使用上述配置,kafka-stream 应用程序开始使用分区最新偏移量的数据。一段时间后,应用程序崩溃了。当应用程序重新上线时,我们希望它使用该分区的最新偏移量的数据,而不是它上次读取的位置。
但是我找不到任何可以帮助使用 kafka-streams 实现它的东西 api。
P.S。我们正在使用 kafka-1.0.0.
开箱即用不支持。
配置 auto.offset.reset
仅在没有提交的偏移量且没有配置更改此行为时触发。
您可以在启动前手动操作偏移量
尽管使用 bin/kafka-consumer-groups.sh
——application.id
是
group.id
并且您可以 "seek to end" 在重新启动应用程序之前。
更新:
自 1.1.0 版本开始,您可以使用 bin/kafka-streams-application-reset.sh
工具来设置起始偏移量。要使用该工具,应用程序必须处于离线状态。 (比照:https://cwiki.apache.org/confluence/display/KAFKA/KIP-171+-+Extend+Consumer+Group+Reset+Offset+for+Stream+Application)