Will Spring KafkaContainerStoppingErrorHandler 提交批处理侦听器的偏移量
Will Spring KafkaContainerStoppingErrorHandler commits offset for batch listener
我正在研究 Spring Kafka 实现,我的用例是批量使用来自 Kafka 主题的消息(使用批量侦听器)。当我使用消息列表时,将迭代并调用 REST 端点以丰富消息。如果 REST API 因任何运行时异常而失败,我已经使用 spring 重试实现了重试逻辑。我想在重试次数失败后停止容器。所以计划使用 KafkaContainerStoppingErrorHandler 来实现这一点。 KafkaContainerStoppingErrorHandler 是否提交之前的成功消息 - 假设我们收到 10 条消息,对于消息 1、2、3、4,充实调用成功,对于消息 5,充实 API 调用失败。所以当我们重新启动容器时,我会再次获得全部 10 条消息还是会收到消息 5-10?
或者有什么方法可以实现上述用例?我研究了 Spring kafka 的所有类型的错误句柄,需要输入有关如何实现上述要求的信息。
你会再次得到它们。
您可以使用 DefaultErrorHandler
(带有自定义恢复器)并抛出一个 BatchListenerFailedException
来指示批处理中的哪条记录失败。
错误处理程序将提交到该记录的偏移量,并使用失败的记录调用恢复器;在您的自定义恢复器中,您可以停止容器(使用与容器停止错误处理程序相同的逻辑)。
在 2.8 之前的版本中,RecoveringBatchErrorHandler
提供了相同的功能。
我正在研究 Spring Kafka 实现,我的用例是批量使用来自 Kafka 主题的消息(使用批量侦听器)。当我使用消息列表时,将迭代并调用 REST 端点以丰富消息。如果 REST API 因任何运行时异常而失败,我已经使用 spring 重试实现了重试逻辑。我想在重试次数失败后停止容器。所以计划使用 KafkaContainerStoppingErrorHandler 来实现这一点。 KafkaContainerStoppingErrorHandler 是否提交之前的成功消息 - 假设我们收到 10 条消息,对于消息 1、2、3、4,充实调用成功,对于消息 5,充实 API 调用失败。所以当我们重新启动容器时,我会再次获得全部 10 条消息还是会收到消息 5-10?
或者有什么方法可以实现上述用例?我研究了 Spring kafka 的所有类型的错误句柄,需要输入有关如何实现上述要求的信息。
你会再次得到它们。
您可以使用 DefaultErrorHandler
(带有自定义恢复器)并抛出一个 BatchListenerFailedException
来指示批处理中的哪条记录失败。
错误处理程序将提交到该记录的偏移量,并使用失败的记录调用恢复器;在您的自定义恢复器中,您可以停止容器(使用与容器停止错误处理程序相同的逻辑)。
在 2.8 之前的版本中,RecoveringBatchErrorHandler
提供了相同的功能。