为什么在 Spring Cloud Stream 反应式消费者中遇到异常时我会收到 onComplete 信号?
Why am I getting onComplete signal when an exception is encountered in Spring Cloud Stream reactive consumer?
我将 Spring Reactor 与 Spring Cloud Stream(GCP Pub/Sub Binder)和 运行 一起用于错误处理问题。我可以用一个非常简单的例子重现这个问题:
@Bean
public Function<Flux<String>, Mono<Void>> consumer() {
return flux -> flux
.doOnNext(msg -> log.info("New message received: {}", msg))
.map(msg -> {
if (true) {
throw new RuntimeException("exception encountered!");
}
return msg;
})
.doOnError(throwable -> log.error("Failed to consume message", throwable))
.then();
}
我期望的行为是看到“无法使用消息”打印,但是,这似乎不是发生的事情。在链中添加 .log()
调用时,我看到 onNext
/onComplete
信号,我希望看到 onError
信号。
我的实际代码如下所示:
@Bean
public Function<Flux<CustomMessage>, Mono<Void>> consumer(MyService myService) {
return flux -> flux
.doOnNext(msg -> log.info("New message received: {}", msg))
.flatMap(myService::processMessage) // exception happens deep in here
.doOnError(throwable -> log.error("Failed to consume message", throwable))
.then();
}
我注意到在我的服务中 class 我试图对我的 Reactor 发布者进行错误处理。但是,使用 Spring Cloud Stream 时不会出现 onError
信号。如果我在单元测试中简单地调用我的服务 myService.processMessage(msg)
并模拟异常,我的反应链将正确传播错误信号。
当我连接到 Spring Cloud Stream 时,这似乎是一个问题。我想知道 Spring Cloud Function/Stream 是否正在执行任何全局错误包装?
在我的非平凡代码中,我确实注意到这条错误消息可能与我没有收到错误信号的原因有关?
ERROR --- onfiguration$FunctionToDestinationBinder : Failed to process the following content which will be dropped: ...
让我更加困惑的是,如果我将 Spring Cloud Stream 绑定切换到非反应性实现,我能够在我的反应链中获得 onError
信号:
@Bean
public Consumer<CustomMessage> consumer(MyService myService) {
return customMessage -> Mono.just(customMessage)
.doOnNext(msg -> log.info("New message received: {}", msg))
.flatMap(myService::processMessage) // exception happens deep in here
.doOnError(throwable -> log.error("Failed to consume message", throwable)) // prints successfully this time
.subscribe();
}
我认为问题存在于以下代码中:
.map(msg -> new RuntimeException("exception encountered!"))
您映射行中的 lambda 返回异常,而不是抛出异常。
这就是我从自己的调查中收集到的信息,也许这可能对其他人有所帮助。预先警告,我可能没有使用正确的“Spring Reactor Language”,但这就是我最终解决它的方式...
在Hoxton.SR5
中,一个onErrorContinue
was included on the reactive binding that managed the flux subscription. The problem with onErrorContinue
是它通过在失败的运算符(如果支持)。
这意味着当我们的 map
/flatMap
运算符发生错误时,onErrorContinue
BiConsumer 将启动并将下游信号修改为 onComplete()
( Mono<T>
) 或 request(...)
(如果它从 Flux<T>
请求新元素)。这导致我们的 doOnError(...)
运算符没有执行,因为没有 onError()
信号。
最终 SCS 团队决定 remove this error handling wrapper。 Hoxton.SR6
不再有此 onErrorContinue
。但是,这意味着向上传播到 SCS 绑定的异常将导致 Flux 订阅被切断。由于没有订阅者,后续消息将无处可路由。
此错误处理已传递给客户端,我们将 onErrorResume
运算符添加到 内部发布者 以有效丢弃错误信号。当 myService::processMessage
发布者遇到错误时,onErrorResume
会将发布者切换到作为参数传入的后备发布者,并从运算符链中的那个点恢复。在我们的例子中,这个回退发布者只是 returns Mono.empty()
这允许我们丢弃错误信号,同时仍然允许内部错误处理机制运行,同时也不影响外部源发布者。
onErrorResume
Example/Explanation
上面的技术可以用一个非常简单的例子来说明。
Flux.just(1, 2, 3)
.flatMap(i -> i == 2
? Mono.error(new RuntimeException("error")
: Mono.just(i))
.onErrorResume(t -> Flux.just(4, 5, 6))
.doOnNext(i -> log.info("Element: {}", i))
.subscribe();
上面的Flux<Integer>
会输出如下:
Element: 1
Element: 4
Element: 5
Element: 6
由于在元素 2
处遇到错误,onErrorResume
回退开始,新发布者有效地 Flux.just(4, 5, 6)
从 恢复 倒退。在我们的例子中,我们不想影响源发布者(即 Flux.just(1, 2, 3)
)。我们只想删除错误的元素 (2
) 并继续下一个元素 (3
)。
我们不能简单地将Flux.just(4, 5, 6)
更改为Flux.empty()
或Mono.empty()
:
Flux.just(1, 2, 3)
.flatMap(i -> i == 2
? Mono.error(new RuntimeException("error")
: Mono.just(i))
.onErrorResume(t -> Mono.empty())
.doOnNext(i -> log.info("Element: {}", i))
.subscribe();
这将导致输出以下内容:
Element: 1
这是因为 onErrorResume
已将上游发布者替换为后备发布者(即 Mono.empty()
),并且从那时起 恢复 。
实现我们期望的输出:
Element: 1
Element: 3
我们必须将 onErrorResume
运算符放在 flatMap
的内部发布者上:
public Mono<Integer> func(int i) {
return i = 2 ? Mono.error(new RuntimeException("error")) : Mono.just(i);
}
Flux.just(1, 2, 3)
.flatMap(i -> func(i)
onErrorResume(t -> Mono.empty()))
.doOnNext(i -> log.info("Element: {}", i))
.subscribe();
现在,onErrorResume
仅影响 func(i)
返回的内部发布者。如果 func(i)
中的运算符发生错误,onErrorResume
将回退到 Mono.empty()
有效地完成 Mono<T>
而不会爆炸。这也仍然允许在回退运行之前应用错误处理运算符(例如 doOnError
)within func(i)
。这是因为,与 onErrorContinue
不同,它不会影响上游运算符并在错误位置更改下一个信号。
最终解决方案
在我的问题中重复使用 code-snippet,我已将我的 Spring 云版本升级到 Hoxton.SR6
并将代码更改为如下内容:
@Bean
public Function<Flux<CustomMessage>, Mono<Void>> consumer(MyService myService) {
return flux -> flux
.doOnNext(msg -> log.info("New message received: {}", msg))
.flatMap(msg -> myService.processMessage(msg)
.onErrorResume(throwable -> Mono.empty())
)
.then();
}
请注意 onErrorResume
位于内部发布者上(在 flatMap
内)。
我将 Spring Reactor 与 Spring Cloud Stream(GCP Pub/Sub Binder)和 运行 一起用于错误处理问题。我可以用一个非常简单的例子重现这个问题:
@Bean
public Function<Flux<String>, Mono<Void>> consumer() {
return flux -> flux
.doOnNext(msg -> log.info("New message received: {}", msg))
.map(msg -> {
if (true) {
throw new RuntimeException("exception encountered!");
}
return msg;
})
.doOnError(throwable -> log.error("Failed to consume message", throwable))
.then();
}
我期望的行为是看到“无法使用消息”打印,但是,这似乎不是发生的事情。在链中添加 .log()
调用时,我看到 onNext
/onComplete
信号,我希望看到 onError
信号。
我的实际代码如下所示:
@Bean
public Function<Flux<CustomMessage>, Mono<Void>> consumer(MyService myService) {
return flux -> flux
.doOnNext(msg -> log.info("New message received: {}", msg))
.flatMap(myService::processMessage) // exception happens deep in here
.doOnError(throwable -> log.error("Failed to consume message", throwable))
.then();
}
我注意到在我的服务中 class 我试图对我的 Reactor 发布者进行错误处理。但是,使用 Spring Cloud Stream 时不会出现 onError
信号。如果我在单元测试中简单地调用我的服务 myService.processMessage(msg)
并模拟异常,我的反应链将正确传播错误信号。
当我连接到 Spring Cloud Stream 时,这似乎是一个问题。我想知道 Spring Cloud Function/Stream 是否正在执行任何全局错误包装?
在我的非平凡代码中,我确实注意到这条错误消息可能与我没有收到错误信号的原因有关?
ERROR --- onfiguration$FunctionToDestinationBinder : Failed to process the following content which will be dropped: ...
让我更加困惑的是,如果我将 Spring Cloud Stream 绑定切换到非反应性实现,我能够在我的反应链中获得 onError
信号:
@Bean
public Consumer<CustomMessage> consumer(MyService myService) {
return customMessage -> Mono.just(customMessage)
.doOnNext(msg -> log.info("New message received: {}", msg))
.flatMap(myService::processMessage) // exception happens deep in here
.doOnError(throwable -> log.error("Failed to consume message", throwable)) // prints successfully this time
.subscribe();
}
我认为问题存在于以下代码中:
.map(msg -> new RuntimeException("exception encountered!"))
您映射行中的 lambda 返回异常,而不是抛出异常。
这就是我从自己的调查中收集到的信息,也许这可能对其他人有所帮助。预先警告,我可能没有使用正确的“Spring Reactor Language”,但这就是我最终解决它的方式...
在Hoxton.SR5
中,一个onErrorContinue
was included on the reactive binding that managed the flux subscription. The problem with onErrorContinue
是它通过在失败的运算符(如果支持)。
这意味着当我们的 map
/flatMap
运算符发生错误时,onErrorContinue
BiConsumer 将启动并将下游信号修改为 onComplete()
( Mono<T>
) 或 request(...)
(如果它从 Flux<T>
请求新元素)。这导致我们的 doOnError(...)
运算符没有执行,因为没有 onError()
信号。
最终 SCS 团队决定 remove this error handling wrapper。 Hoxton.SR6
不再有此 onErrorContinue
。但是,这意味着向上传播到 SCS 绑定的异常将导致 Flux 订阅被切断。由于没有订阅者,后续消息将无处可路由。
此错误处理已传递给客户端,我们将 onErrorResume
运算符添加到 内部发布者 以有效丢弃错误信号。当 myService::processMessage
发布者遇到错误时,onErrorResume
会将发布者切换到作为参数传入的后备发布者,并从运算符链中的那个点恢复。在我们的例子中,这个回退发布者只是 returns Mono.empty()
这允许我们丢弃错误信号,同时仍然允许内部错误处理机制运行,同时也不影响外部源发布者。
onErrorResume
Example/Explanation
上面的技术可以用一个非常简单的例子来说明。
Flux.just(1, 2, 3)
.flatMap(i -> i == 2
? Mono.error(new RuntimeException("error")
: Mono.just(i))
.onErrorResume(t -> Flux.just(4, 5, 6))
.doOnNext(i -> log.info("Element: {}", i))
.subscribe();
上面的Flux<Integer>
会输出如下:
Element: 1
Element: 4
Element: 5
Element: 6
由于在元素 2
处遇到错误,onErrorResume
回退开始,新发布者有效地 Flux.just(4, 5, 6)
从 恢复 倒退。在我们的例子中,我们不想影响源发布者(即 Flux.just(1, 2, 3)
)。我们只想删除错误的元素 (2
) 并继续下一个元素 (3
)。
我们不能简单地将Flux.just(4, 5, 6)
更改为Flux.empty()
或Mono.empty()
:
Flux.just(1, 2, 3)
.flatMap(i -> i == 2
? Mono.error(new RuntimeException("error")
: Mono.just(i))
.onErrorResume(t -> Mono.empty())
.doOnNext(i -> log.info("Element: {}", i))
.subscribe();
这将导致输出以下内容:
Element: 1
这是因为 onErrorResume
已将上游发布者替换为后备发布者(即 Mono.empty()
),并且从那时起 恢复 。
实现我们期望的输出:
Element: 1
Element: 3
我们必须将 onErrorResume
运算符放在 flatMap
的内部发布者上:
public Mono<Integer> func(int i) {
return i = 2 ? Mono.error(new RuntimeException("error")) : Mono.just(i);
}
Flux.just(1, 2, 3)
.flatMap(i -> func(i)
onErrorResume(t -> Mono.empty()))
.doOnNext(i -> log.info("Element: {}", i))
.subscribe();
现在,onErrorResume
仅影响 func(i)
返回的内部发布者。如果 func(i)
中的运算符发生错误,onErrorResume
将回退到 Mono.empty()
有效地完成 Mono<T>
而不会爆炸。这也仍然允许在回退运行之前应用错误处理运算符(例如 doOnError
)within func(i)
。这是因为,与 onErrorContinue
不同,它不会影响上游运算符并在错误位置更改下一个信号。
最终解决方案
在我的问题中重复使用 code-snippet,我已将我的 Spring 云版本升级到 Hoxton.SR6
并将代码更改为如下内容:
@Bean
public Function<Flux<CustomMessage>, Mono<Void>> consumer(MyService myService) {
return flux -> flux
.doOnNext(msg -> log.info("New message received: {}", msg))
.flatMap(msg -> myService.processMessage(msg)
.onErrorResume(throwable -> Mono.empty())
)
.then();
}
请注意 onErrorResume
位于内部发布者上(在 flatMap
内)。