Kafka Streams:提交不会发生

Kafka Streams : Commit doesn't happen

我是 Kafka Streams 的新手,我正在尝试在超时情况下试验 kafka 流的行为。

这是我使用处理器 API 测试的场景:

事情是这样的: 在第一次尝试时,它开始使用来自 kafka 主题的消息,并在调用 process() 时开始休眠。 60000 毫秒后,它再次调用 process 方法,没有抛出任何异常,但此时它在 20000 毫秒打印后退出睡眠,"done sleeping" 并将消息发布到输出主题。在此之后,它再次开始使用来自相同偏移量的相同消息而不提交。这是循环发生的。

示例输出:

关键是:12345678

开始时间:2018/07/09 07:34:25

关键是:12345678

开始时间:2018/07/09 07:35:27

睡觉了

结束时间:2018/07/09 07:35:45

Offset*****224 partitionId****0taskId*****0_0javaThreadId*******12 value*****abc

关键是:12345678

开始时间:2018/07/09 07:36:27

睡觉了

结束时间:2018/07/09 07:36:47

Offset*****224 partitionId****0taskId*****0_0javaThreadId*******14 value*****abc

关键是:12345678

开始时间:2018/07/09 07:37:27

睡觉了

结束时间:2018/07/09 07:37:47

Offset*****224 partitionId****0taskId*****0_0javaThreadId*******12 value*****abc

附加信息:

提前致谢。

不是 100% 确定,但是请注意:如果你调用 context#commit() 你只 "request" 一个提交并且 Kafka Streams 尝试尽快提交——但是在 context#commit() 返回之后,提交还没有发生...

另请注意,如果您的超时时间为 60.000 并且您休眠了 80.000,则您的应用程序应该从消费者组中删除,因此不允许再提交。对于这种情况,日志中应该有 WARN 日志消息。

希望对您有所帮助。