Kafka Stream Process API 批量提交(基于消息数量)
Kafka Stream Process API batch commit (based number of messages)
我正在使用 Kafka 处理器 API 我不想只使用基于时间的方法来决定何时在任务中提交已处理的消息,但我会根据许多已处理的消息或超时。
在 Java?
中有什么方法可以实现吗?
处理器 API 允许您通过 ProcessorContext#commit()
"request" 提交。调用此方法,Kafka Streams 将尽快提交。这应该允许您实现一些 Processor
内部计数器并在此计数器中调用 commit() 基础。
此外,您可以在那里使用配置提交间隔,或者将其设置为 Long.MAX_VALUE
.
以有效禁用它
您还可以根据事件时间或挂钟时间安排标点符号并从那里调用 commit() 以获得您想要的 "timeout" 行为。
我正在使用 Kafka 处理器 API 我不想只使用基于时间的方法来决定何时在任务中提交已处理的消息,但我会根据许多已处理的消息或超时。 在 Java?
中有什么方法可以实现吗?处理器 API 允许您通过 ProcessorContext#commit()
"request" 提交。调用此方法,Kafka Streams 将尽快提交。这应该允许您实现一些 Processor
内部计数器并在此计数器中调用 commit() 基础。
此外,您可以在那里使用配置提交间隔,或者将其设置为 Long.MAX_VALUE
.
您还可以根据事件时间或挂钟时间安排标点符号并从那里调用 commit() 以获得您想要的 "timeout" 行为。