运行 Spring Webflux 注释控制器中不同线程中的任务

Running Tasks in different thread in Spring Webflux Annotated controller

我有一个 spring Webflux 注释控制器如下,

 @RestController
public class TestBlockingController {

  Logger log = LoggerFactory.getLogger(this.getClass().getName());

  @GetMapping()
  public Mono<String> blockForXSeconds(@RequestParam("block-seconds") Integer blockSeconds) {
    return getStringMono();
  }

  private Mono<String> getStringMono() {
    Integer blockSeconds = 5;
    String type = new String();
    try {
      if (blockSeconds % 2 == 0) {
        Thread.sleep(blockSeconds * 1000);
        type = "EVEN";
      } else {
        Thread.sleep(blockSeconds * 1000);
        type = "ODD";
      }
    } catch (Exception e) {
      log.info("Got Exception");
    }
    log.info("Type of block-seconds: " + blockSeconds);
    return Mono.just(type);
  }
}

如何在与 Netty 服务器线程不同的线程中创建 getStringMono 运行。我面临的问题是,当我在服务器线程中 运行 时,我的吞吐量基本上减少了(每秒 2 个请求)。如何在单独的线程中制作 运行ning getStringMono。

您可以使用 subscribeOn 运算符将任务委托给不同的线程池:

Mono.defer(() -> getStringMono()).subscribeOn(Schedulers.elastic());

尽管如此,您必须注意,应不惜一切代价在反应式应用程序中避免这种类型的阻塞。如果可能,请使用支持非阻塞 IO 和 returns 承诺类型(Mono、CompletableFuture 等)的客户端。如果您只想人为延迟,请改用 Mono.delay

您可以使用 Mono.defer() 方法。 方法签名如下:

public static <T> Mono<T> defer(Supplier<? extends Mono<? extends T>> supplier)

你的休息 API 应该是这样的。

@GetMapping()
public Mono<String> blockForXSeconds(@RequestParam("block-seconds") Integer blockSeconds) {
    return Mono.defer(() -> getStringMono());
}

defer 运算符是为了让这个源变得懒惰,每次有新订阅者时都会重新评估 lambda 的内容。这将增加您的 API 吞吐量。

可以查看详细分析