Spring Reactor: Optional<T>对应的class是什么?

Spring Reactor: What's the corresponding class to Optional<T>?

所以我有一个 Flux<Foo>,我想将每个 Foo 映射到 Baz。问题是,getBaz(Foo foo) 可能会抛出一个 IOException.

所以我考虑过使用 Mono<Baz> getBazRx(Foo foo) 方法,在出现异常时 return 可以是 Mono.just(baz)Mono.empty()

然后将以 Flux<Mono<Baz>> 结束,这提醒了 Optional<T> 容器。

Spring Reactor 就是这样做的吗?如何正确食用?

既然你想跳过这个错误(例如只是记录它),你可以使用onErrorContinue。此外,由于 getBaz 抛出一个已检查的异常,我们需要捕获它并 return (不抛出)一个 RuntimeException 代替。 Reactor 有一个实用方法可以做到这一点 Exceptions.propagate:

flux
  .map(foo -> {
      try {
        return getBaz(foo);
      } catch (IOException e) {
        return Exceptions.propagate(e);
      }
  })
  .onErrorContinue(RuntimeException.class, (t, b) -> log.error(t))
  .subscribe(baz -> log.info("Read value {}", baz));

在反应流中,"optionals" 通常通过从流中删除不存在的元素来处理(例如,一个空的 Mono,或一个删除元素的 Flux。),而不是有 Flux<Optional>Mono<Optional>Flux<Mono>

调用同步 getBaz 方法时,可以使用单个 .handle 操作,如下所示:

flux
    .handle((foo, sink) -> {
        try {
            // propagate Baz down the stream
            sink.next(getBaz(foo));
        } catch (IOException e) {
            // Since sink.next is not called here,
            // the problematic element will be dropped from the stream
            log.error(e);
        }
    })

调用异步getBazRx方法(返回Mono)时,可以在flatMap/flatMapSequential/[=26=中使用onErrorResume ],像这样:

flux
    .flatMap(foo -> getBazRx(foo)
        .onErrorResume(t -> {
            log.error(t);
            return Mono.empty();
        }))

(或者您可以将 .onErrorResume 移动到 .getBazRx 内,具体取决于您要捕获并忽略异常的位置)

此外,既然你在问题中提到了它......如果你要创建 getBazRx 来包装 getBaz,你应该 永远不要 做如果 getBaz 有可能阻止,则类似这样:

Mono<Baz> getBazRx(Foo foo) {
    // BAD!!!
    try {
        return Mono.just(getBaz(foo));
    } catch (IOException e) {
        return Mono.error(e)  // or Mono.empty() if you want to ignore
    }
}

该实现实际上只是一个同步方法模仿一个异步方法。它有两个问题:

  1. 工作立即完成,而不是在订阅返回的 Mono
  2. 之后
  3. 如果 getBaz 阻塞,您最终可能会阻塞事件循环

相反,您应该推迟工作,直到单声道被订阅,并且 运行 在 Scheduler 上用于阻塞操作的任何阻塞操作,如下所示:

Mono<Baz> getBazRx(Foo foo) {
    return Mono.fromSupplier(() -> {
            try {
                return getBaz(foo);
            } catch (IOException e) {
                throw Exceptions.propagate(e);  // or return null to ignore and complete empty
            }
        })
        .subscribeOn(Schedulers.elastic());  // run on a scheduler suitable for blocking work
}