使用 Spring Cloud Stream Kafka Binder 时 Kafka Producer 中的错误处理
Error Handling in Kafka Producer while using Spring Cloud Stream Kafka Binder
我有一个休息 post 端点,它使用 Spring Cloud Stream Kafka Binder 消耗数据并写入 Kafka。现在我们没有任何错误处理。但是我们希望通过在数据未写入 Kafka 时添加额外检查来使该端点容错。我们打算在数据未写入 Kafka 时发送一个异常信息对象。我正在尝试以这种方式使用全局错误来实现这一点
@ServiceActivator(inputChannel = "errorChannel")
public void handle(final ErrorMessage em) {
logger.error("encountered exception" + em.getOriginalMessage().toString());
throw new RuntimeException(em.getOriginalMessage().toString);
}
我的疑问有两个:
- 当我们向Kafka写入数据失败时,这是正确的异常处理方式吗
- 每当数据写入失败时是否调用此处理方法,此更改是否传播到系统级错误处理。
如有其他流程请指教。我们目前正在探索应用程序级别的错误处理和全局级别的错误处理。系统级错误处理暂时关闭 table。提前致谢。
您必须使用 errorChannelEnabled
https://docs.spring.io/spring-cloud-stream/docs/3.1.3/reference/html/spring-cloud-stream.html#_producer_properties
选择异步错误处理
您可以通过将kafka生产者属性 sync
设置为true
https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.3/reference/html/spring-cloud-stream-binder-kafka.html#kafka-producer-properties
来在发送线程上获得异常
我有一个休息 post 端点,它使用 Spring Cloud Stream Kafka Binder 消耗数据并写入 Kafka。现在我们没有任何错误处理。但是我们希望通过在数据未写入 Kafka 时添加额外检查来使该端点容错。我们打算在数据未写入 Kafka 时发送一个异常信息对象。我正在尝试以这种方式使用全局错误来实现这一点
@ServiceActivator(inputChannel = "errorChannel")
public void handle(final ErrorMessage em) {
logger.error("encountered exception" + em.getOriginalMessage().toString());
throw new RuntimeException(em.getOriginalMessage().toString);
}
我的疑问有两个:
- 当我们向Kafka写入数据失败时,这是正确的异常处理方式吗
- 每当数据写入失败时是否调用此处理方法,此更改是否传播到系统级错误处理。
如有其他流程请指教。我们目前正在探索应用程序级别的错误处理和全局级别的错误处理。系统级错误处理暂时关闭 table。提前致谢。
您必须使用 errorChannelEnabled
https://docs.spring.io/spring-cloud-stream/docs/3.1.3/reference/html/spring-cloud-stream.html#_producer_properties
您可以通过将kafka生产者属性 sync
设置为true
https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.3/reference/html/spring-cloud-stream-binder-kafka.html#kafka-producer-properties