在 Reactive Stream 中阻止调用

Blocking calls in Reactive Stream

我想知道正常的 java API 调用(我的意思是没有 I/O 的方法)是否应该作为“迷你阻塞调用”线程化?像这样实现 Reactive Streams 是否可以接受(在 returning Publisher 之前调用方法):

  public Mono<String> doSomething(Object anyObject){
      validator.validate(anyObject); // it returns void so in this case it cannot be in filter
      return Mono.just(anyObject)
          .flatmap(service::process);
  }

而不是(在流中调用它)

  public Mono<String> doSomething(Object anyObject){
      return Mono.just(anyObject)
          .doOnNext(validator::validate)
          .flatmap(service::process);
  }

此验证器仅用于示例。该方法是否有任何缺点或这些方法在 return 语句之前应始终包含在流中?

I wonder if normal java API calls (I mean methods without I/O) should be threaded as a "mini blocking calls"?

我不太确定您在这里所说的迷你阻塞调用是什么意思 - 方法阻塞或不阻塞。如果它不阻塞是因为它不需要(该方法不涉及任何 IO 或其他阻塞操作)那么它只是一个标准的同步操作。

在这种情况下,我希望尽可能遵守两个相关原则:

  • 将所有内容都保留在反应链中(这使代码更清晰,并且意味着在下游工作的运算符具有适当的效果);
  • 除了琐碎的情况外,避免在所有情况下使用 Mono.just()(否则您看起来像是在使用一个值的“承诺”,而不是立即可用的值。)

因此,我更喜欢第二种方法,因为那里的一切显然都是反应链的一部分,并且这使得代码更容易阅读恕我直言 - 你不会在反应式和非反应式之间来回切换代码。

但是,请记住,您会将此方法用作更大反应链的一部分(很可能通过 flatMap() 调用),如果可能的话,实际上我更喜欢另一种方法 - 即改为使用 transform(),这意味着您可以提供一种方法,该方法既采用 Mono 作为参数,又采用 returns 一个:

public Mono<String> doSomething(Mono<String> mono){
  return mono
      .doOnNext(validator::validate)
      .flatmap(service::process);
}

这样就可以将所有内容保留为反应链的一部分,并且具有避免使用 Mono.just().

的额外优势