Spring Cloud Kafka Streams 中的错误处理

Error handling in Spring Cloud Kafka Streams

我正在使用 Spring Cloud Stream 和 Kafka Streams。假设我有一个处理器,它是一个将字符串的 KStream 转换为 CityProgrammes 的 KStream 的函数。它调用 API 按名称查找城市,并调用其他转换查找该城市附近的任何事件。

现在的问题是在转换过程中发生任何错误,整个应用程序停止。我想将该特定消息发送到 DLQ 并继续进行。我已经读了好几天了,每个人都建议处理被调用服务中的错误,但我认为这是无稽之谈,而且我仍然需要 return KStream:我如何在捕获中做到这一点?

我也查看了 UncaughtExeptionHandler,但它不知道该消息,只能重新启动不会跳过此无效消息的处理。

这听起来像是 A-B 问题,因此问题改写为:发生异常时如何维护 KStream 中的流程并将无效项发送到 DLQ?

当涉及到您遇到的应用程序级错误时,如何处理错误取决于应用程序本身。 Kafka Streams 和 Spring Cloud Stream binder 主要支持框架级别的反序列化和序列化错误。虽然是这样,但我认为你的场景是可以处理的。如果您使用的是 2.8 之前的 Kafka 客户端,这是我之前就类似问题给出的 SO 答案:

如果您使用的是 Kafka/Streams 2.8,这里有一个您可以使用的想法。但是,下面的代码只能用作起点。根据您的用例进行调整。阅读有关 Kafka Streams 2.8 中分支如何工作的更多信息。 branching API 在 2.8 中对之前的版本进行了重大重构。

public Function<KStream<?, String>, KStream<?, Foo>> convert() {
            Foo[] foo = new Foo[0];
            return input -> {
                final Map<String, ? extends KStream<?, String>> branches =
                        input.split(Named.as("foo-")).branch((key, value) -> {
                                    try {
                                        foo[0] = new Foo(); // your API call for CitiProgramme converion here, possibly.
                                        return true;
                                    }
                                    catch (Exception e) {
                                        Message<?> message = MessageBuilder.withPayload(value).build();
                                        streamBridge.send("to-my-dlt", message);
                                        return false;
                                    }

                                }, Branched.as("bar"))
                                .defaultBranch();

                final KStream<?, String> kStream = branches.get("foo-bar");
                return kStream.map((key, value) -> new KeyValue<>("", foo[0]));
            };

        }


    }

默认分支在此代码中被忽略,因为它只包含抛出异常的记录。这些由上面的 catch 语句处理,我们在其中以编程方式将记录发送到 DLT。最后,我们获取好的记录并将它们映射到一个新的KStream并通过outbound发送。