Spring Cloud Kafka Streams 中的错误处理
Error handling in Spring Cloud Kafka Streams
我正在使用 Spring Cloud Stream 和 Kafka Streams。假设我有一个处理器,它是一个将字符串的 KStream 转换为 CityProgrammes 的 KStream 的函数。它调用 API 按名称查找城市,并调用其他转换查找该城市附近的任何事件。
现在的问题是在转换过程中发生任何错误,整个应用程序停止。我想将该特定消息发送到 DLQ 并继续进行。我已经读了好几天了,每个人都建议处理被调用服务中的错误,但我认为这是无稽之谈,而且我仍然需要 return KStream:我如何在捕获中做到这一点?
我也查看了 UncaughtExeptionHandler,但它不知道该消息,只能重新启动不会跳过此无效消息的处理。
这听起来像是 A-B 问题,因此问题改写为:发生异常时如何维护 KStream 中的流程并将无效项发送到 DLQ?
当涉及到您遇到的应用程序级错误时,如何处理错误取决于应用程序本身。 Kafka Streams 和 Spring Cloud Stream binder 主要支持框架级别的反序列化和序列化错误。虽然是这样,但我认为你的场景是可以处理的。如果您使用的是 2.8 之前的 Kafka 客户端,这是我之前就类似问题给出的 SO 答案:
如果您使用的是 Kafka/Streams 2.8,这里有一个您可以使用的想法。但是,下面的代码只能用作起点。根据您的用例进行调整。阅读有关 Kafka Streams 2.8 中分支如何工作的更多信息。 branching API 在 2.8 中对之前的版本进行了重大重构。
public Function<KStream<?, String>, KStream<?, Foo>> convert() {
Foo[] foo = new Foo[0];
return input -> {
final Map<String, ? extends KStream<?, String>> branches =
input.split(Named.as("foo-")).branch((key, value) -> {
try {
foo[0] = new Foo(); // your API call for CitiProgramme converion here, possibly.
return true;
}
catch (Exception e) {
Message<?> message = MessageBuilder.withPayload(value).build();
streamBridge.send("to-my-dlt", message);
return false;
}
}, Branched.as("bar"))
.defaultBranch();
final KStream<?, String> kStream = branches.get("foo-bar");
return kStream.map((key, value) -> new KeyValue<>("", foo[0]));
};
}
}
默认分支在此代码中被忽略,因为它只包含抛出异常的记录。这些由上面的 catch
语句处理,我们在其中以编程方式将记录发送到 DLT。最后,我们获取好的记录并将它们映射到一个新的KStream
并通过outbound发送。
我正在使用 Spring Cloud Stream 和 Kafka Streams。假设我有一个处理器,它是一个将字符串的 KStream 转换为 CityProgrammes 的 KStream 的函数。它调用 API 按名称查找城市,并调用其他转换查找该城市附近的任何事件。
现在的问题是在转换过程中发生任何错误,整个应用程序停止。我想将该特定消息发送到 DLQ 并继续进行。我已经读了好几天了,每个人都建议处理被调用服务中的错误,但我认为这是无稽之谈,而且我仍然需要 return KStream:我如何在捕获中做到这一点?
我也查看了 UncaughtExeptionHandler,但它不知道该消息,只能重新启动不会跳过此无效消息的处理。
这听起来像是 A-B 问题,因此问题改写为:发生异常时如何维护 KStream 中的流程并将无效项发送到 DLQ?
当涉及到您遇到的应用程序级错误时,如何处理错误取决于应用程序本身。 Kafka Streams 和 Spring Cloud Stream binder 主要支持框架级别的反序列化和序列化错误。虽然是这样,但我认为你的场景是可以处理的。如果您使用的是 2.8 之前的 Kafka 客户端,这是我之前就类似问题给出的 SO 答案:
如果您使用的是 Kafka/Streams 2.8,这里有一个您可以使用的想法。但是,下面的代码只能用作起点。根据您的用例进行调整。阅读有关 Kafka Streams 2.8 中分支如何工作的更多信息。 branching API 在 2.8 中对之前的版本进行了重大重构。
public Function<KStream<?, String>, KStream<?, Foo>> convert() {
Foo[] foo = new Foo[0];
return input -> {
final Map<String, ? extends KStream<?, String>> branches =
input.split(Named.as("foo-")).branch((key, value) -> {
try {
foo[0] = new Foo(); // your API call for CitiProgramme converion here, possibly.
return true;
}
catch (Exception e) {
Message<?> message = MessageBuilder.withPayload(value).build();
streamBridge.send("to-my-dlt", message);
return false;
}
}, Branched.as("bar"))
.defaultBranch();
final KStream<?, String> kStream = branches.get("foo-bar");
return kStream.map((key, value) -> new KeyValue<>("", foo[0]));
};
}
}
默认分支在此代码中被忽略,因为它只包含抛出异常的记录。这些由上面的 catch
语句处理,我们在其中以编程方式将记录发送到 DLT。最后,我们获取好的记录并将它们映射到一个新的KStream
并通过outbound发送。