执行 Flux.map() 时如何处理错误
How to handle error while executing Flux.map()
我正在尝试弄清楚在 Flux 中映射元素时如何处理错误。
例如,我正在将 CSV 字符串解析为我的业务 POJO 之一:
myflux.map(stock -> converter.convertHistoricalCSVToStockQuotation(stock));
有些行可能包含错误,所以我在日志中得到的是:
reactor.core.publisher.FluxLog: onNext([SOME_BOGUS_QUOTE]@38.09 (Fri Apr 08 00:00:00 CEST 2016) H(38.419998)/L(37.849998)/O(37.970001))
reactor.core.publisher.FluxLog: onNext([SOME_BOGUS_QUOTE]@38.130001 (Thu Apr 07 00:00:00 CEST 2016) H(38.189999)/L(37.610001)/O(37.799999))
reactor.core.publisher.FluxLog: onError(java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo)
reactor.core.publisher.FluxLog: java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo
我在 API 中阅读了一些错误处理方法,但大多数提到了返回 "error value" 或使用后备 Flux,例如:
Flux.onErrorResumeWith(myflux, x -> Mono.fromCallable(() -> ... do stuff);
然而,在我的 myflux
中使用它意味着再次处理整个通量。
那么,有没有办法在处理特定元素时处理错误(即忽略 them/Logging 它们)并继续处理其余的通量?
更新@akarnokd 解决方法
public Flux<StockQuotation> getQuotes(List<String> tickers)
{
Flux<StockQuotation> processingFlux = Flux.fromIterable(tickers)
// Get each set of quotes in a separate thread
.flatMap(s -> Mono.fromCallable(() -> feeder.getCSVQuotes(s)))
// Convert each list of raw quotes string in a new Flux<String>
.flatMap(list -> Flux.fromIterable(list))
// Convert the string to POJOs
.flatMap(x -> {
try {
return Flux.just(converter.convertHistoricalCSVToStockQuotation(x));
}
catch (IllegalArgumentException ex){
System.out.println("Error decoding stock quotation: " + x);
return Flux.empty();
}
});
return processingFlux;
}
这很有用,但是,您可以看到代码不如以前那么优雅。 Flux API 没有任何方法可以执行此代码的操作吗?
retry(...)
retryWhen(...)
onErrorResumeWith(...)
onErrorReturn(...)
你需要 flatMap
,如果处理失败,你 return 一个空序列:
myflux.flatMap(v -> {
try {
return Flux.just(converter.convertHistoricalCSVToStockQuotation(stock));
} catch (IllegalArgumentException ex) {
return Flux.empty();
}
});
在当前版本的 Reactor 3 中,添加了相当多的方法。所以我们可以这样做:
Flux.onErrorResume(error -> {
System.out.println("Error decoding stock quotation: " + e);
return Flux.empty();
});
查看有关如何处理错误的更多信息here
如果想使用Reactor 3的异常处理方法,可以使用Mono.fromCallable
.
flatMap(x ->
Mono.fromCallable(() -> converter.convertHistoricalCSVToStockQuotation(x))
.flux()
.flatMap(Flux::fromIterable)
.onErrorResume(Flux::empty)
)
不幸的是没有 Flux.fromCallable
,因此假设可调用 returns 是一个列表,您必须手动将其转换为 Flux。
您可以使用 onErrorContinue。
它允许通过删除故障元素并继续处理后续元素来从错误中恢复。
...
// Convert the string to POJOs
.flatMap(x ->
Flux.just(converter.convertHistoricalCSVToStockQuotation(x))
.doOnError(IllegalArgumentException.class,
e -> System.out.println("Error decoding stock quotation: " + x))
//.onErrorStop()
.onErrorResume(IllegalArgumentException.class, e -> Flux.empty())
)
...
我正在尝试弄清楚在 Flux 中映射元素时如何处理错误。
例如,我正在将 CSV 字符串解析为我的业务 POJO 之一:
myflux.map(stock -> converter.convertHistoricalCSVToStockQuotation(stock));
有些行可能包含错误,所以我在日志中得到的是:
reactor.core.publisher.FluxLog: onNext([SOME_BOGUS_QUOTE]@38.09 (Fri Apr 08 00:00:00 CEST 2016) H(38.419998)/L(37.849998)/O(37.970001))
reactor.core.publisher.FluxLog: onNext([SOME_BOGUS_QUOTE]@38.130001 (Thu Apr 07 00:00:00 CEST 2016) H(38.189999)/L(37.610001)/O(37.799999))
reactor.core.publisher.FluxLog: onError(java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo)
reactor.core.publisher.FluxLog: java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo
我在 API 中阅读了一些错误处理方法,但大多数提到了返回 "error value" 或使用后备 Flux,例如:
Flux.onErrorResumeWith(myflux, x -> Mono.fromCallable(() -> ... do stuff);
然而,在我的 myflux
中使用它意味着再次处理整个通量。
那么,有没有办法在处理特定元素时处理错误(即忽略 them/Logging 它们)并继续处理其余的通量?
更新@akarnokd 解决方法
public Flux<StockQuotation> getQuotes(List<String> tickers)
{
Flux<StockQuotation> processingFlux = Flux.fromIterable(tickers)
// Get each set of quotes in a separate thread
.flatMap(s -> Mono.fromCallable(() -> feeder.getCSVQuotes(s)))
// Convert each list of raw quotes string in a new Flux<String>
.flatMap(list -> Flux.fromIterable(list))
// Convert the string to POJOs
.flatMap(x -> {
try {
return Flux.just(converter.convertHistoricalCSVToStockQuotation(x));
}
catch (IllegalArgumentException ex){
System.out.println("Error decoding stock quotation: " + x);
return Flux.empty();
}
});
return processingFlux;
}
这很有用,但是,您可以看到代码不如以前那么优雅。 Flux API 没有任何方法可以执行此代码的操作吗?
retry(...)
retryWhen(...)
onErrorResumeWith(...)
onErrorReturn(...)
你需要 flatMap
,如果处理失败,你 return 一个空序列:
myflux.flatMap(v -> {
try {
return Flux.just(converter.convertHistoricalCSVToStockQuotation(stock));
} catch (IllegalArgumentException ex) {
return Flux.empty();
}
});
在当前版本的 Reactor 3 中,添加了相当多的方法。所以我们可以这样做:
Flux.onErrorResume(error -> {
System.out.println("Error decoding stock quotation: " + e);
return Flux.empty();
});
查看有关如何处理错误的更多信息here
如果想使用Reactor 3的异常处理方法,可以使用Mono.fromCallable
.
flatMap(x ->
Mono.fromCallable(() -> converter.convertHistoricalCSVToStockQuotation(x))
.flux()
.flatMap(Flux::fromIterable)
.onErrorResume(Flux::empty)
)
不幸的是没有 Flux.fromCallable
,因此假设可调用 returns 是一个列表,您必须手动将其转换为 Flux。
您可以使用 onErrorContinue。 它允许通过删除故障元素并继续处理后续元素来从错误中恢复。
...
// Convert the string to POJOs
.flatMap(x ->
Flux.just(converter.convertHistoricalCSVToStockQuotation(x))
.doOnError(IllegalArgumentException.class,
e -> System.out.println("Error decoding stock quotation: " + x))
//.onErrorStop()
.onErrorResume(IllegalArgumentException.class, e -> Flux.empty())
)
...