Java 反应堆 - 链 Mono<Void> 与另一个异步任务生成 Mono<Object>

Java reactor - chain Mono<Void> with another async task producing Mono<Object>

我有以下异步任务:

public class AsyncValidationTask {
    // Returns Mono.error(new Exception()) if error, otherwise Mono.empty()
    public Mono<Void> execute(Object o);
}
public class AsyncSaveTask {
    // Returns Mono.error(new Exception()) if error, otherwise Mono of Object
    public Mono<Object> execute(Object o);
}

及以下服务 class:

public class AsyncService {

    private AsyncValidationTask validation;

    private AsyncSaveTask save;

    public Mono<Object> validateAndSave(Object o) {
        return Mono.defer(() -> this.validation.execute(o))
                   // Right now, the problem is that when validation completes successfully, it 
                   // emits Mono.empty hence the flatMap chained below will not be invoked at all.
                   .flatMap(dontcare -> this.save.execute(o))
    }
}

如上所示,如果 AsyncValidationTask.execute 成功完成,我尝试使用 flatMap 链接 AsyncSaveTask.execute 调用,它不会工作,因为没有发出任何内容(Mono.empty) 完成后。

我还考虑 then 链接第二个调用,但它总是会调用链接的调用,而不管第一个验证调用产生的 Mono.error。

如何正确链接它们?

.then 仅供终端源链接

使用.then,以便链接您的执行与进程,它只发送一个终端信号。

另外,注意,如果你需要对错误信号做些什么,那么你必须事先伴随着你的.thenonErrorResume

public class AsyncService {

    private AsyncValidationTask validation;

    private AsyncSaveTask save;

    public Mono<Object> validateAndSave(Object o) {
        return Mono.defer(() -> this.validation.execute(o))
                   .onErrorResume(t -> ...) // should be before then
                   .then(this.save.execute(o))
    }
}

.defer 为了推迟单声道创作

为了仅在验证成功的情况下执行 this.save.execute(o),您还必须将其包装在 Mono.defer 中:

public class AsyncService {

    private AsyncValidationTask validation;

    private AsyncSaveTask save;

    public Mono<Object> validateAndSave(Object o) {
        return Mono.defer(() -> this.validation.execute(o))
                   .onErrorResume(t -> ...) // should be before then
                   .then(Mono.defer(() -> this.save.execute(o)))
    }
}

通常不需要,因为Mono是一个 LAZY 类型 SHOULD start 仅在 情况下工作订阅 发生了(订阅== .subscribe())。

Mono#then 的实现保证 this.save.execute 方法返回的 Mono 订阅开始 Mono.defer(() -> this.validation.execute(o)) 完成 之后。

执行可能提前开始的唯一原因可能是目的(例如,有意提供此类行为的业务逻辑 - 缓存;热源等)OR INCORRECT 实现 this.save.execute(o) 无论实际订阅如何,它都会开始工作。

正确设计您的实现

一般来说,确保 API 确实有效并将其公开为 Publisher(例如 Mono | Flux)是一个很好的做法懒惰的。

这意味着 API 创建者必须确保只有在用户订阅了给定的 Publisher 实例的情况下才会执行工作。

例如,如果您的异步 API 在下面进行 CompletableFuture 创建,则值得手动将您的 CompletableFuture 创建包装到 Mono.defer 中或使用适当的方法扩展,例如 Mono.fromFuture(Supplier<? extends CompletableFuture<? extends T>> futureSupplier)

执行器示例

让我们考虑如何使常规 ThreadPool 任务提交 Reactive。

interface Executor  {
  Future<T> execute(Callable<T> runnable); 
}

因此,为了使 Executor 具有响应性,我们必须创建如下内容:

interface ReactiveExecutor {
   Mono<T> execute(Callable<T> runnable);
}

实施不正确

以下是此类适配器的可能实现:

class ReactiveExecutorAdapter {
   final Executor delegate;

   ...


   Mono<T> execute(Callable<T> runnable) {
      MonoProcessor<T> result = MonoProcessor.create();
      Future<T> task = delegate.execute(() -> {
          T value = runnable.call();
          result.onNext(value);
          result.onComplet();
          return value;
      });

      return result.doOnCancel(() -> task.cancel());
   }
}

当然,这样的实现会起作用。但是,它有几个关键问题:

  1. 执行从方法调用开始(这与反应流的惰性行为有些矛盾Publisher
  2. 由于执行在实际任务订阅之前开始,我们必须创建一个有状态的Mono,它支持稍后的订阅。
  3. 此实现不处理根本没有订阅者的情况(例如,执行已经开始,但没有发生 .subscribe 方法(然后我们得到无法处理的值泄漏)
  4. 总的来说太hacky了,无法成为解决方案。此外,为了防止前面提到的所有情况,有必要在实现之外用 Mono.defer 包装 Mono execute(..) 上的每个调用(请参阅问题中的原始问题)。随后,它导致 API 用户很容易 'shoot your self in the foot' 忘记用额外的 .defer
  5. 包装执行

那么,怎么解决呢?

基本上,将 Mono.defer 移动到库内部就足够了。这将使 API 用户的生活更轻松,因为他们不必考虑何时需要使用延迟(因此 - 可能出现的问题更少)。

例如,我们的 Reactive Executor 最简单的解决方案可以是:

class ReactiveExecutorAdapter {
   final Executor delegate;

   ...


   Mono<T> execute(Callable<T> runnable) {
      Mono.defer(() -> {
          MonoProcessor<T> result = MonoProcessor.create();
          Future<T> task = delegate.execute(() -> {
              T value = runnable.call();
              result.onNext(value);
              result.onComplet();
              return value;
          });

          return result.doOnCancel(() -> task.cancel());
     })
   }
}

通过延迟执行,我们至少可以解决一个问题——确保值不再泄露。

但是,如何正确解决呢?

但是,为了解决该特定情况下所有可能出现的问题,我们可以使用 Mono.create,它是为采用异步而设计的 API:

class ReactiveExecutorAdapter {
   final Executor delegate;

   ...


   Mono<T> execute(Callable<T> runnable) {
      Mono.crete(monoSink -> {

          Future<T> task = delegate.execute(() -> {
              T value = runnable.call();
              monoSink.complete(value);
          });

          monoSink.doOnCancel(task::cancel);
     })
   }
}

使用 Mono.create 我们可以保证对每个订阅者进行惰性执行。 此外,使用 MonoSink API,我们可以快速挂接来自订阅者的所有基本信号。 最后,Mono.create 确保在发生任何情况时,该值将被适当丢弃。

最后,有了这样的 API 就没有必要对所有情况都使用 defer

不确定我是否正确理解了这个问题,但是..看看 OP 问题中方法的签名,对于 ExecutorService,它们看起来并不像 "async tasks",它们看起来像仅仅是 Mono 生产方法,在 'reactive' 世界中经常遇到的东西。那么问题就是将它们与

之类的东西结合起来
Mono<Object> validateAndSave(Object o) {
    return validation.execute(o)
               .then(save.execute(o));

.then 将忽略源发出的元素(即 validation.execute),但不会忽略错误信号。所以在 onError 的情况下,你的方法将 return Mono.error(),否则它将 return what save.execute returned.