使用 Kafka Streams DSL 时如何处理错误和不提交

How to handle error and don't commit when use Kafka Streams DSL

对于Kafka Streams,如果我们使用低级处理器API,我们可以控制是否提交。因此,如果我们的代码出现问题,并且我们不想提交此消息。在这种情况下,Kafka 将多次重新发送此消息,直到问题得到解决。

但是如何在使用其高层流DSL时控制是否提交消息API?

资源:

http://docs.confluent.io/2.1.0-alpha1/streams/developer-guide.html

你的说法不完全正确。您 不能 "control to commit or not" -- 至少不能直接(既不在处理器 API 中也不在 DSL 中)。您只能使用 ProcessorContext#commit() 来请求 额外的 提交。因此,在调用 #commit() 之后,Streams 尝试尽快提交,但不是立即提交。此外,即使您从未调用 #commit(),Streams 也会自动提交。您可以通过 Streams 配置控制 Streams 提交间隔 commit.interval.m(参见 http://docs.confluent.io/current/streams/developer-guide.html#configuring-a-kafka-streams-application

如果是 "problem",这取决于您遇到的问题类型:

  • 如果您检测到无法恢复的问题,您只能抛出异常并"stop the world"(参见下文)。
  • 如果您有可恢复的错误,您需要在您自己的代码中 "loop"(例如,在 Processor#process()KeyValueMapper#apply() 中,直到问题得到解决并且您可以成功处理当前消息(请注意,您可能 运行 超时,即异常,使用此策略 - 参见消费者配置 heartbeat.interval.ms 和 0.10.1 session.timeout.ms [KIP-62] )
  • 另一种方法是,将现在无法处理的记录放入 StateStore 中,稍后再处理。然而,很难做到正确并且还打破了一些 Streams 假设(例如,处理顺序)。不建议使用,如果使用,必须非常小心其影响

如果存在未捕获的异常StreamThread 将终止并且不会发生任何提交(您可以注册一个异常处理程序以获得有关此的通知:http://docs.confluent.io/current/streams/developer-guide.html#using-kafka-streams-within-your-application-code。如果你 StreamThread 都死了,你将需要创建一个新的 KafkaStreams 实例来重新启动你的应用程序。

在消息被成功处理之前,您不能从用户代码 return,因为如果您 return,Streams 假定消息已被成功处理(因此可能会提交相应的偏移量)。关于要点(3),将记录放入特殊的StateStore中供以后处理被认为是"successfully"已处理的记录。