Spring Reactor: Optional<T>对应的class是什么?
Spring Reactor: What's the corresponding class to Optional<T>?
所以我有一个 Flux<Foo>
,我想将每个 Foo
映射到 Baz
。问题是,getBaz(Foo foo)
可能会抛出一个 IOException
.
所以我考虑过使用 Mono<Baz> getBazRx(Foo foo)
方法,在出现异常时 return 可以是 Mono.just(baz)
或 Mono.empty()
。
然后将以 Flux<Mono<Baz>>
结束,这提醒了 Optional<T>
容器。
Spring Reactor 就是这样做的吗?如何正确食用?
既然你想跳过这个错误(例如只是记录它),你可以使用onErrorContinue
。此外,由于 getBaz
抛出一个已检查的异常,我们需要捕获它并 return
(不抛出)一个 RuntimeException
代替。 Reactor 有一个实用方法可以做到这一点 Exceptions.propagate
:
flux
.map(foo -> {
try {
return getBaz(foo);
} catch (IOException e) {
return Exceptions.propagate(e);
}
})
.onErrorContinue(RuntimeException.class, (t, b) -> log.error(t))
.subscribe(baz -> log.info("Read value {}", baz));
在反应流中,"optionals" 通常通过从流中删除不存在的元素来处理(例如,一个空的 Mono
,或一个删除元素的 Flux
。),而不是有 Flux<Optional>
、Mono<Optional>
或 Flux<Mono>
调用同步 getBaz
方法时,可以使用单个 .handle
操作,如下所示:
flux
.handle((foo, sink) -> {
try {
// propagate Baz down the stream
sink.next(getBaz(foo));
} catch (IOException e) {
// Since sink.next is not called here,
// the problematic element will be dropped from the stream
log.error(e);
}
})
调用异步getBazRx
方法(返回Mono
)时,可以在flatMap
/flatMapSequential
/[=26=中使用onErrorResume
],像这样:
flux
.flatMap(foo -> getBazRx(foo)
.onErrorResume(t -> {
log.error(t);
return Mono.empty();
}))
(或者您可以将 .onErrorResume
移动到 .getBazRx
内,具体取决于您要捕获并忽略异常的位置)
此外,既然你在问题中提到了它......如果你要创建 getBazRx
来包装 getBaz
,你应该 永远不要 做如果 getBaz
有可能阻止,则类似这样:
Mono<Baz> getBazRx(Foo foo) {
// BAD!!!
try {
return Mono.just(getBaz(foo));
} catch (IOException e) {
return Mono.error(e) // or Mono.empty() if you want to ignore
}
}
该实现实际上只是一个同步方法模仿一个异步方法。它有两个问题:
- 工作立即完成,而不是在订阅返回的
Mono
之后
- 如果
getBaz
阻塞,您最终可能会阻塞事件循环
相反,您应该推迟工作,直到单声道被订阅,并且 运行 在 Scheduler
上用于阻塞操作的任何阻塞操作,如下所示:
Mono<Baz> getBazRx(Foo foo) {
return Mono.fromSupplier(() -> {
try {
return getBaz(foo);
} catch (IOException e) {
throw Exceptions.propagate(e); // or return null to ignore and complete empty
}
})
.subscribeOn(Schedulers.elastic()); // run on a scheduler suitable for blocking work
}
所以我有一个 Flux<Foo>
,我想将每个 Foo
映射到 Baz
。问题是,getBaz(Foo foo)
可能会抛出一个 IOException
.
所以我考虑过使用 Mono<Baz> getBazRx(Foo foo)
方法,在出现异常时 return 可以是 Mono.just(baz)
或 Mono.empty()
。
然后将以 Flux<Mono<Baz>>
结束,这提醒了 Optional<T>
容器。
Spring Reactor 就是这样做的吗?如何正确食用?
既然你想跳过这个错误(例如只是记录它),你可以使用onErrorContinue
。此外,由于 getBaz
抛出一个已检查的异常,我们需要捕获它并 return
(不抛出)一个 RuntimeException
代替。 Reactor 有一个实用方法可以做到这一点 Exceptions.propagate
:
flux
.map(foo -> {
try {
return getBaz(foo);
} catch (IOException e) {
return Exceptions.propagate(e);
}
})
.onErrorContinue(RuntimeException.class, (t, b) -> log.error(t))
.subscribe(baz -> log.info("Read value {}", baz));
在反应流中,"optionals" 通常通过从流中删除不存在的元素来处理(例如,一个空的 Mono
,或一个删除元素的 Flux
。),而不是有 Flux<Optional>
、Mono<Optional>
或 Flux<Mono>
调用同步 getBaz
方法时,可以使用单个 .handle
操作,如下所示:
flux
.handle((foo, sink) -> {
try {
// propagate Baz down the stream
sink.next(getBaz(foo));
} catch (IOException e) {
// Since sink.next is not called here,
// the problematic element will be dropped from the stream
log.error(e);
}
})
调用异步getBazRx
方法(返回Mono
)时,可以在flatMap
/flatMapSequential
/[=26=中使用onErrorResume
],像这样:
flux
.flatMap(foo -> getBazRx(foo)
.onErrorResume(t -> {
log.error(t);
return Mono.empty();
}))
(或者您可以将 .onErrorResume
移动到 .getBazRx
内,具体取决于您要捕获并忽略异常的位置)
此外,既然你在问题中提到了它......如果你要创建 getBazRx
来包装 getBaz
,你应该 永远不要 做如果 getBaz
有可能阻止,则类似这样:
Mono<Baz> getBazRx(Foo foo) {
// BAD!!!
try {
return Mono.just(getBaz(foo));
} catch (IOException e) {
return Mono.error(e) // or Mono.empty() if you want to ignore
}
}
该实现实际上只是一个同步方法模仿一个异步方法。它有两个问题:
- 工作立即完成,而不是在订阅返回的
Mono
之后
- 如果
getBaz
阻塞,您最终可能会阻塞事件循环
相反,您应该推迟工作,直到单声道被订阅,并且 运行 在 Scheduler
上用于阻塞操作的任何阻塞操作,如下所示:
Mono<Baz> getBazRx(Foo foo) {
return Mono.fromSupplier(() -> {
try {
return getBaz(foo);
} catch (IOException e) {
throw Exceptions.propagate(e); // or return null to ignore and complete empty
}
})
.subscribeOn(Schedulers.elastic()); // run on a scheduler suitable for blocking work
}