Kafka Streams 在生产者抛出异常时提交偏移量
Kafka Streams commits offset when producer throws an exception
在我的 Kafka 流应用程序中,我有一个处理器,计划每 60 秒生成一次输出消息。输出消息是根据来自单个输入主题的消息构建的。有时会发生输出消息大于代理配置的限制(默认为 1MB)。抛出异常并关闭应用程序。提交间隔设置为默认值 (60s)。
在这种情况下,我希望在接下来的 运行 中,所有在崩溃前 60 年代消费的消息都将被重新消费。但实际上这些消息的偏移量已提交,并且不会在下一个 运行.
再次处理消息
阅读 的答案在我看来不应该提交偏移量。当我将提交间隔增加到 120 秒(处理器仍然每 60 秒打断一次)时,它会按预期工作并且不会提交偏移量。
我正在使用默认处理保证,但我也尝试过 exactly_once
。两者都有相同的结果。从处理器调用 context.commit()
似乎对问题没有影响。
我是不是做错了什么?
其中一些将取决于您使用的客户端以及它是否基于 librdkafka。
部分答案还取决于您 "looping" 对 "poll" 方法的了解程度。典型示例类似于 https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html 处 "Automatic Offset Committing" 下的代码
但这假设一个非常快速的轮询循环(100 毫秒 + 处理时间)和 1000 毫秒的 auto.commit.timeout.ms
(默认通常为 5000 毫秒)。
如果我没看错你的问题,你似乎每 60 秒消费一次消息?
需要注意的是,kafka 客户端的行为与调用 poll
的频率密切相关(某些库会将 poll 包装在 "Consume" 方法之类的东西中)。频繁调用 poll 很重要,以便向代理显示 "alive"。如果您不至少每隔 max.poll.interval.ms
(默认 5 分钟)轮询一次,您将获得其他异常。它可能导致客户被踢出他们的消费群体。
总之,就这一点而言...auto.commit.interval.ms
只是最大值。如果消息已 accepted/acknowledged 或已使用 StoreOffset,则在轮询时,客户端可以决定更新代理上的偏移量。可能是由于客户端缓冲区大小被命中或其他一些语义。
另一件要看的事情(特别是如果使用基于 librdkafka 的客户端。其他人有类似的东西)是 enable.auto.offset.store
(默认为真)这将 "Automatically store offset of last message provided to application" 所以每次你 poll/consume来自客户端的消息将 StoreOffset。如果您还使用 auto.commit,那么您的偏移量可能会以您意想不到的方式移动。
有关 librdkafka 的全套配置,请参阅 https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md。
consuming/acknowledging有many/many种方式。我认为对于您的情况,配置页面上 max.poll.interval.ms
的评论可能是相关的。
”
注意:对于长时间处理的应用,建议设置enable.auto.offset.store=false,然后在消息处理后显式存储偏移量(使用offsets_store())
“
抱歉,"answer" 有点啰嗦。我希望有一些话题可以让你继续。
Kafka Streams 中 Processor
的约定是,您 完全 处理了输入记录和 forward()
所有相应的输出消息之前 process()
return。 -- 这个合约意味着允许Kafka Streams在process()
return秒后提交相应的偏移量。
您 "buffer" 内存中的 process()
消息似乎稍后会发出。这违反了本合同。如果你想要 "buffer" 消息,你应该将状态存储附加到 Processor
并将所有这些消息放入存储中(参见 https://kafka.apache.org/25/documentation/streams/developer-guide/processor-api.html#state-stores)。该商店由 Kafka Streams 为您管理,并且具有容错能力。这样,在发生错误后状态将恢复并且您不会丢失任何数据(即使输入消息未被重新处理)。
我怀疑将提交间隔设置为 120 秒是否真的适用于所有情况,因为提交发生的时间和标点符号调用的时间之间没有对齐。
在我的 Kafka 流应用程序中,我有一个处理器,计划每 60 秒生成一次输出消息。输出消息是根据来自单个输入主题的消息构建的。有时会发生输出消息大于代理配置的限制(默认为 1MB)。抛出异常并关闭应用程序。提交间隔设置为默认值 (60s)。
在这种情况下,我希望在接下来的 运行 中,所有在崩溃前 60 年代消费的消息都将被重新消费。但实际上这些消息的偏移量已提交,并且不会在下一个 运行.
再次处理消息阅读
我正在使用默认处理保证,但我也尝试过 exactly_once
。两者都有相同的结果。从处理器调用 context.commit()
似乎对问题没有影响。
我是不是做错了什么?
其中一些将取决于您使用的客户端以及它是否基于 librdkafka。
部分答案还取决于您 "looping" 对 "poll" 方法的了解程度。典型示例类似于 https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html 处 "Automatic Offset Committing" 下的代码
但这假设一个非常快速的轮询循环(100 毫秒 + 处理时间)和 1000 毫秒的 auto.commit.timeout.ms
(默认通常为 5000 毫秒)。
如果我没看错你的问题,你似乎每 60 秒消费一次消息?
需要注意的是,kafka 客户端的行为与调用 poll
的频率密切相关(某些库会将 poll 包装在 "Consume" 方法之类的东西中)。频繁调用 poll 很重要,以便向代理显示 "alive"。如果您不至少每隔 max.poll.interval.ms
(默认 5 分钟)轮询一次,您将获得其他异常。它可能导致客户被踢出他们的消费群体。
总之,就这一点而言...auto.commit.interval.ms
只是最大值。如果消息已 accepted/acknowledged 或已使用 StoreOffset,则在轮询时,客户端可以决定更新代理上的偏移量。可能是由于客户端缓冲区大小被命中或其他一些语义。
另一件要看的事情(特别是如果使用基于 librdkafka 的客户端。其他人有类似的东西)是 enable.auto.offset.store
(默认为真)这将 "Automatically store offset of last message provided to application" 所以每次你 poll/consume来自客户端的消息将 StoreOffset。如果您还使用 auto.commit,那么您的偏移量可能会以您意想不到的方式移动。
有关 librdkafka 的全套配置,请参阅 https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md。
consuming/acknowledging有many/many种方式。我认为对于您的情况,配置页面上 max.poll.interval.ms
的评论可能是相关的。
” 注意:对于长时间处理的应用,建议设置enable.auto.offset.store=false,然后在消息处理后显式存储偏移量(使用offsets_store()) “
抱歉,"answer" 有点啰嗦。我希望有一些话题可以让你继续。
Kafka Streams 中 Processor
的约定是,您 完全 处理了输入记录和 forward()
所有相应的输出消息之前 process()
return。 -- 这个合约意味着允许Kafka Streams在process()
return秒后提交相应的偏移量。
您 "buffer" 内存中的 process()
消息似乎稍后会发出。这违反了本合同。如果你想要 "buffer" 消息,你应该将状态存储附加到 Processor
并将所有这些消息放入存储中(参见 https://kafka.apache.org/25/documentation/streams/developer-guide/processor-api.html#state-stores)。该商店由 Kafka Streams 为您管理,并且具有容错能力。这样,在发生错误后状态将恢复并且您不会丢失任何数据(即使输入消息未被重新处理)。
我怀疑将提交间隔设置为 120 秒是否真的适用于所有情况,因为提交发生的时间和标点符号调用的时间之间没有对齐。