在 flatMap 中抛出的异常被 onErrorResume 运算符忽略
Exception thrown in a flatMap are ignored by onErrorResume operator
考虑以下代码:
@Slf4j
@ExtendWith(MockitoExtension.class)
class ConnectionEventsConsumerTest {
@Test
public void testOnErrorResume() {
Flux.range(0, 5)
.doOnNext(event -> log.info("Processing - {}", event))
.flatMap(event -> processEvent(event)
.doOnSuccess(result -> log.info("Processed - {}", event))
.onErrorResume(t -> handleError(t, event))
)
.doOnError(t -> log.error("Exception propagated", t))
//.log()
.then()
.subscribe();
}
private Mono<Void> processEvent(Object object) {
return Mono.error(() -> new RuntimeException("test"));
//throw new RuntimeException("test");
}
private Mono<Void> handleError(Throwable throwable, Object object) {
log.error("Processing Failed - {}", object);
return Mono.empty();
}
}
如果方法 processEvent returns a Mono.error 与抛出异常时的输出完全不同。
代码原样(返回 Mono.error),我看到了我的预期,Processing 和 Processins 的 300 次迭代失败,我没有看到任何异常传播。
17:33:19.853 [main] INFO
com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing - 0 17:33:19.864 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing Failed - 0 17:33:19.865 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing - 1 17:33:19.866 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing Failed - 1 17:33:19.866 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing - 2 17:33:19.866 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing Failed - 2 17:33:19.866 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing - 3 17:33:19.866 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing Failed - 3 17:33:19.866 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing - 4 17:33:19.866 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing Failed - 4
另一方面,如果我取消对 throw 的注释,我会看到正在处理的 Flux 中的单个项目,我看不到来自 handleError 的消息,我会看到“Exception Propagated”
17:35:53.950 [main] INFO
com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing - 0 17:35:53.968 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Exception propagated java.lang.RuntimeException: test
如果这是设计使然,flatMap 的最佳做法是什么?想到的一个简单的解决方案是用 try-catch 包围 flatMap 的内容,以将异常包装在 Mono.error 中。虽然它有效,但它不够优雅且过于手动,很可能会被遗忘。
一个方法creating/returning一个Mono
不应该以这种方式抛出异常。由于在 Mono
被组装(创建)之前抛出异常,因此 flatMap
中的后续运算符不可能生效,因为它们需要 Mono
来操作。
如果您无法控制 processEvent()
方法来修复它的行为,那么您可以用 Mono.defer
包装它,这将确保即使在组装期间出现的错误也会通过flatMap
里面的 Mono
:
Flux.range(0, 5)
.doOnNext(event -> log.info("Processing - {}", event))
.flatMap(event -> Mono.defer(() -> processEvent(event))
.doOnSuccess(result -> log.info("Processed - {}", event))
.onErrorResume(t -> handleError(t, event)))
.doOnError(t -> log.error("Exception propagated", t))
private Mono<Void> processEvent(Object object) {
throw new RuntimeException("test");
}
请注意,在 map
或 doOnNext
等其他中间运算符中,您可以随意以丑陋的方式抛出异常,因为 Reactor 可以将它们转换为正确的错误信号,因为此时 Mono
已经在进行中。
考虑以下代码:
@Slf4j
@ExtendWith(MockitoExtension.class)
class ConnectionEventsConsumerTest {
@Test
public void testOnErrorResume() {
Flux.range(0, 5)
.doOnNext(event -> log.info("Processing - {}", event))
.flatMap(event -> processEvent(event)
.doOnSuccess(result -> log.info("Processed - {}", event))
.onErrorResume(t -> handleError(t, event))
)
.doOnError(t -> log.error("Exception propagated", t))
//.log()
.then()
.subscribe();
}
private Mono<Void> processEvent(Object object) {
return Mono.error(() -> new RuntimeException("test"));
//throw new RuntimeException("test");
}
private Mono<Void> handleError(Throwable throwable, Object object) {
log.error("Processing Failed - {}", object);
return Mono.empty();
}
}
如果方法 processEvent returns a Mono.error 与抛出异常时的输出完全不同。
代码原样(返回 Mono.error),我看到了我的预期,Processing 和 Processins 的 300 次迭代失败,我没有看到任何异常传播。
17:33:19.853 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing - 0 17:33:19.864 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing Failed - 0 17:33:19.865 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing - 1 17:33:19.866 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing Failed - 1 17:33:19.866 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing - 2 17:33:19.866 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing Failed - 2 17:33:19.866 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing - 3 17:33:19.866 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing Failed - 3 17:33:19.866 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing - 4 17:33:19.866 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing Failed - 4
另一方面,如果我取消对 throw 的注释,我会看到正在处理的 Flux 中的单个项目,我看不到来自 handleError 的消息,我会看到“Exception Propagated”
17:35:53.950 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing - 0 17:35:53.968 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Exception propagated java.lang.RuntimeException: test
如果这是设计使然,flatMap 的最佳做法是什么?想到的一个简单的解决方案是用 try-catch 包围 flatMap 的内容,以将异常包装在 Mono.error 中。虽然它有效,但它不够优雅且过于手动,很可能会被遗忘。
一个方法creating/returning一个Mono
不应该以这种方式抛出异常。由于在 Mono
被组装(创建)之前抛出异常,因此 flatMap
中的后续运算符不可能生效,因为它们需要 Mono
来操作。
如果您无法控制 processEvent()
方法来修复它的行为,那么您可以用 Mono.defer
包装它,这将确保即使在组装期间出现的错误也会通过flatMap
里面的 Mono
:
Flux.range(0, 5)
.doOnNext(event -> log.info("Processing - {}", event))
.flatMap(event -> Mono.defer(() -> processEvent(event))
.doOnSuccess(result -> log.info("Processed - {}", event))
.onErrorResume(t -> handleError(t, event)))
.doOnError(t -> log.error("Exception propagated", t))
private Mono<Void> processEvent(Object object) {
throw new RuntimeException("test");
}
请注意,在 map
或 doOnNext
等其他中间运算符中,您可以随意以丑陋的方式抛出异常,因为 Reactor 可以将它们转换为正确的错误信号,因为此时 Mono
已经在进行中。