为什么在 Spring Cloud Stream 反应式消费者中遇到异常时我会收到 onComplete 信号?

Why am I getting onComplete signal when an exception is encountered in Spring Cloud Stream reactive consumer?

我将 Spring Reactor 与 Spring Cloud Stream(GCP Pub/Sub Binder)和 运行 一起用于错误处理问题。我可以用一个非常简单的例子重现这个问题:

@Bean
public Function<Flux<String>, Mono<Void>> consumer() {
    return flux -> flux
        .doOnNext(msg -> log.info("New message received: {}", msg))
        .map(msg -> {
            if (true) { 
                throw new RuntimeException("exception encountered!");
            }
            return msg;
        })
        .doOnError(throwable -> log.error("Failed to consume message", throwable))
        .then();
}

我期望的行为是看到“无法使用消息”打印,但是,这似乎不是发生的事情。在链中添加 .log() 调用时,我看到 onNext/onComplete 信号,我希望看到 onError 信号。

我的实际代码如下所示:

@Bean
public Function<Flux<CustomMessage>, Mono<Void>> consumer(MyService myService) {
    return flux -> flux
        .doOnNext(msg -> log.info("New message received: {}", msg))
        .flatMap(myService::processMessage) // exception happens deep in here
        .doOnError(throwable -> log.error("Failed to consume message", throwable))
        .then();
}

我注意到在我的服务中 class 我试图对我的 Reactor 发布者进行错误处理。但是,使用 Spring Cloud Stream 时不会出现 onError 信号。如果我在单元测试中简单地调用我的服务 myService.processMessage(msg) 并模拟异常,我的反应链将正确传播错误信号。

当我连接到 Spring Cloud Stream 时,这似乎是一个问题。我想知道 Spring Cloud Function/Stream 是否正在执行任何全局错误包装?

在我的非平凡代码中,我确实注意到这条错误消息可能与我没有收到错误信号的原因有关?

ERROR --- onfiguration$FunctionToDestinationBinder : Failed to process the following content which will be dropped: ...

让我更加困惑的是,如果我将 Spring Cloud Stream 绑定切换到非反应性实现,我能够在我的反应链中获得 onError 信号:

@Bean
public Consumer<CustomMessage> consumer(MyService myService) {
    return customMessage -> Mono.just(customMessage)
        .doOnNext(msg -> log.info("New message received: {}", msg))
        .flatMap(myService::processMessage) // exception happens deep in here
        .doOnError(throwable -> log.error("Failed to consume message", throwable)) // prints successfully this time
        .subscribe();
}

我认为问题存在于以下代码中:

    .map(msg -> new RuntimeException("exception encountered!"))

您映射行中的 lambda 返回异常,而不是抛出异常。

这就是我从自己的调查中收集到的信息,也许这可能对其他人有所帮助。预先警告,我可能没有使用正确的“Spring Reactor Language”,但这就是我最终解决它的方式...

Hoxton.SR5中,一个onErrorContinue was included on the reactive binding that managed the flux subscription. The problem with onErrorContinue是它通过在失败的运算符(如果支持)。

这意味着当我们的 map/flatMap 运算符发生错误时,onErrorContinue BiConsumer 将启动并将下游信号修改为 onComplete() ( Mono<T>) 或 request(...)(如果它从 Flux<T> 请求新元素)。这导致我们的 doOnError(...) 运算符没有执行,因为没有 onError() 信号。

最终 SCS 团队决定 remove this error handling wrapperHoxton.SR6 不再有此 onErrorContinue。但是,这意味着向上传播到 SCS 绑定的异常将导致 Flux 订阅被切断。由于没有订阅者,后续消息将无处可路由。

此错误处理已传递给客户端,我们将 onErrorResume 运算符添加到 内部发布者 以有效丢弃错误信号。当 myService::processMessage 发布者遇到错误时,onErrorResume 会将发布者切换到作为参数传入的后备发布者,并从运算符链中的那个点恢复。在我们的例子中,这个回退发布者只是 returns Mono.empty() 这允许我们丢弃错误信号,同时仍然允许内部错误处理机制运行,同时也不影响外部源发布者。

onErrorResume Example/Explanation

上面的技术可以用一个非常简单的例子来说明。

Flux.just(1, 2, 3)
    .flatMap(i -> i == 2
        ? Mono.error(new RuntimeException("error")
        : Mono.just(i))
    .onErrorResume(t -> Flux.just(4, 5, 6))
    .doOnNext(i -> log.info("Element: {}", i))
    .subscribe();

上面的Flux<Integer>会输出如下:

Element: 1
Element: 4
Element: 5
Element: 6

由于在元素 2 处遇到错误,onErrorResume 回退开始,新发布者有效地 Flux.just(4, 5, 6) 恢复 倒退。在我们的例子中,我们不想影响源发布者(即 Flux.just(1, 2, 3))。我们只想删除错误的元素 (2) 并继续下一个元素 (3)。

我们不能简单地将Flux.just(4, 5, 6)更改为Flux.empty()Mono.empty()

Flux.just(1, 2, 3)
    .flatMap(i -> i == 2
        ? Mono.error(new RuntimeException("error")
        : Mono.just(i))
    .onErrorResume(t -> Mono.empty())
    .doOnNext(i -> log.info("Element: {}", i))
    .subscribe();

这将导致输出以下内容:

Element: 1

这是因为 onErrorResume 已将上游发布者替换为后备发布者(即 Mono.empty()),并且从那时起 恢复

实现我们期望的输出:

Element: 1
Element: 3

我们必须将 onErrorResume 运算符放在 flatMap 的内部发布者上:

public Mono<Integer> func(int i) {
    return i = 2 ? Mono.error(new RuntimeException("error")) : Mono.just(i);
}

Flux.just(1, 2, 3)
    .flatMap(i -> func(i)
        onErrorResume(t -> Mono.empty()))
    .doOnNext(i -> log.info("Element: {}", i))
    .subscribe();

现在,onErrorResume 仅影响 func(i) 返回的内部发布者。如果 func(i) 中的运算符发生错误,onErrorResume 将回退到 Mono.empty() 有效地完成 Mono<T> 而不会爆炸。这也仍然允许在回退运行之前应用错误处理运算符(例如 doOnErrorwithin func(i)。这是因为,与 onErrorContinue 不同,它不会影响上游运算符并在错误位置更改下一个信号。

最终解决方案

在我的问题中重复使用 code-snippet,我已将我的 Spring 云版本升级到 Hoxton.SR6 并将代码更改为如下内容:

@Bean
public Function<Flux<CustomMessage>, Mono<Void>> consumer(MyService myService) {
    return flux -> flux
        .doOnNext(msg -> log.info("New message received: {}", msg))
        .flatMap(msg -> myService.processMessage(msg)
            .onErrorResume(throwable -> Mono.empty())
        )
        .then();
}

请注意 onErrorResume 位于内部发布者上(在 flatMap 内)。