使用 Kafka Streams DSL 时如何处理错误和不提交
How to handle error and don't commit when use Kafka Streams DSL
对于Kafka Streams,如果我们使用低级处理器API,我们可以控制是否提交。因此,如果我们的代码出现问题,并且我们不想提交此消息。在这种情况下,Kafka 将多次重新发送此消息,直到问题得到解决。
但是如何在使用其高层流DSL时控制是否提交消息API?
资源:
http://docs.confluent.io/2.1.0-alpha1/streams/developer-guide.html
你的说法不完全正确。您 不能 "control to commit or not" -- 至少不能直接(既不在处理器 API 中也不在 DSL 中)。您只能使用 ProcessorContext#commit()
来请求 额外的 提交。因此,在调用 #commit()
之后,Streams 尝试尽快提交,但不是立即提交。此外,即使您从未调用 #commit()
,Streams 也会自动提交。您可以通过 Streams 配置控制 Streams 提交间隔 commit.interval.m
(参见 http://docs.confluent.io/current/streams/developer-guide.html#configuring-a-kafka-streams-application)
如果是 "problem",这取决于您遇到的问题类型:
- 如果您检测到无法恢复的问题,您只能抛出异常并"stop the world"(参见下文)。
- 如果您有可恢复的错误,您需要在您自己的代码中 "loop"(例如,在
Processor#process()
或 KeyValueMapper#apply()
中,直到问题得到解决并且您可以成功处理当前消息(请注意,您可能 运行 超时,即异常,使用此策略 - 参见消费者配置 heartbeat.interval.ms
和 0.10.1 session.timeout.ms
[KIP-62] )
- 另一种方法是,将现在无法处理的记录放入
StateStore
中,稍后再处理。然而,很难做到正确并且还打破了一些 Streams 假设(例如,处理顺序)。不建议使用,如果使用,必须非常小心其影响
如果存在未捕获的异常,StreamThread
将终止并且不会发生任何提交(您可以注册一个异常处理程序以获得有关此的通知:http://docs.confluent.io/current/streams/developer-guide.html#using-kafka-streams-within-your-application-code。如果你 StreamThread
都死了,你将需要创建一个新的 KafkaStreams
实例来重新启动你的应用程序。
在消息被成功处理之前,您不能从用户代码 return,因为如果您 return,Streams 假定消息已被成功处理(因此可能会提交相应的偏移量)。关于要点(3),将记录放入特殊的StateStore中供以后处理被认为是"successfully"已处理的记录。
对于Kafka Streams,如果我们使用低级处理器API,我们可以控制是否提交。因此,如果我们的代码出现问题,并且我们不想提交此消息。在这种情况下,Kafka 将多次重新发送此消息,直到问题得到解决。
但是如何在使用其高层流DSL时控制是否提交消息API?
资源:
http://docs.confluent.io/2.1.0-alpha1/streams/developer-guide.html
你的说法不完全正确。您 不能 "control to commit or not" -- 至少不能直接(既不在处理器 API 中也不在 DSL 中)。您只能使用 ProcessorContext#commit()
来请求 额外的 提交。因此,在调用 #commit()
之后,Streams 尝试尽快提交,但不是立即提交。此外,即使您从未调用 #commit()
,Streams 也会自动提交。您可以通过 Streams 配置控制 Streams 提交间隔 commit.interval.m
(参见 http://docs.confluent.io/current/streams/developer-guide.html#configuring-a-kafka-streams-application)
如果是 "problem",这取决于您遇到的问题类型:
- 如果您检测到无法恢复的问题,您只能抛出异常并"stop the world"(参见下文)。
- 如果您有可恢复的错误,您需要在您自己的代码中 "loop"(例如,在
Processor#process()
或KeyValueMapper#apply()
中,直到问题得到解决并且您可以成功处理当前消息(请注意,您可能 运行 超时,即异常,使用此策略 - 参见消费者配置heartbeat.interval.ms
和 0.10.1session.timeout.ms
[KIP-62] ) - 另一种方法是,将现在无法处理的记录放入
StateStore
中,稍后再处理。然而,很难做到正确并且还打破了一些 Streams 假设(例如,处理顺序)。不建议使用,如果使用,必须非常小心其影响
如果存在未捕获的异常,StreamThread
将终止并且不会发生任何提交(您可以注册一个异常处理程序以获得有关此的通知:http://docs.confluent.io/current/streams/developer-guide.html#using-kafka-streams-within-your-application-code。如果你 StreamThread
都死了,你将需要创建一个新的 KafkaStreams
实例来重新启动你的应用程序。
在消息被成功处理之前,您不能从用户代码 return,因为如果您 return,Streams 假定消息已被成功处理(因此可能会提交相应的偏移量)。关于要点(3),将记录放入特殊的StateStore中供以后处理被认为是"successfully"已处理的记录。