使用 fromCallable 时 onSubscribe 来得太迟

onSubscribe comes too late when using fromCallable

我经常需要在 onSubscribe 和 onFinally 执行一些逻辑,这可以通过

方便地实现
private <T> Mono<T> doAtStartAndEnd(Mono<T> source) {
  return source.doOnSubscribe((s) -> {
      System.out.println("ON SUBSCRIBE");
    }).doFinally((f) -> {
      System.out.println("ON FINALLY");
    });
}

并通过 transform 在以下链中使用它:

List<String> result = Mono.fromCallable(() -> getListOfStrings())
// .log()
  .flatMapIterable(list -> list)
  .map(String::toUpperCase)
  .collectList()
  .transform(this::doAtStartAndEnd)
  .block();

当然,预期的行为是 ON SUBSCRIBE 出现在控制台中,在调用可调用对象之前,这里是 getListOfStrings()。但是 MonoFlattenIterable 的订阅逻辑会导致相反的行为。这不仅适用于 flatMapIterable,而且适用于 zip 等各种其他运算符。

如果我取消注释带有 .log() 的行,链会按预期运行。

也许这与 Reactive Gem #22 中的完全相同,但是我怎样才能实现所需的行为,而不用再次包装 Mono/Flux,例如在 Mono.defer(() -> Mono.fromCallable(() -> getListOfStrings()))?

flatMapIterable 试图避免在源被检测为 Callable 时执行完整的 RS 订阅和请求周期来获取 Iterable。相反,它直接调用 call() 方法。

这适用于 Flux.just(myList).flatMapIterable(list -> list) 这样的情况。但是,我认为在 Mono.fromCallable...

上这样做可能过于急切

抑制检测到此优化的 Mono.fromCallable() 的一种方法是在它之后使用 .hide()。至于 .log() 这会将 flatMapIterable 可见的实例更改为非 Callable.

的实例