Project Reactor - 并行执行

Project Reactor - Parallel Execution

我有下面的助焊剂,

@Test
public void fluxWithRange_CustomTest() {
    Flux<Integer> intFlux = Flux.range(1, 10).flatMap(i -> {
        if (i % 2 == 0) {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            return Mono.just(i);
        } else {
            return Mono.just(i);
        }
    }, 2).subscribeOn(Schedulers.newBoundedElastic(2, 2, "test")).log();

    StepVerifier.create(intFlux).expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).verifyComplete();
}

我原以为它会 运行 并行执行,但是,它只在 1 个线程中执行。

当“某人”订阅您的 Flux 时,subscribeOn 方法仅提供一种将执行转移到不同线程的方法。这意味着当您使用 StepVerifier 时,您正在订阅通量,并且因为您定义了一个调度程序,所以执行被移动到调度程序提供的线程之一。这并不意味着 Flux 将在多个线程之间跳转。

您期望的行为可以通过添加第二个 subscribeOn 来归档,但是添加到您在 flatMap 中使用的 Mono。当 flatMap 现在订阅内容时,它将使用另一个线程。

如果您将代码更改为如下内容:

  @Test
  public void fluxWithRange_CustomTest() throws InterruptedException {
    Flux<Integer> intFlux = Flux.range(1, 10)
      .flatMap(i -> subFlux(i),2)
      .subscribeOn(Schedulers.newBoundedElastic(2, 2, "test")).log();

    StepVerifier.create(intFlux).expectNext(1, 2, 3, 4, 5, 6, 7, 8,9,10).verifyComplete(); //This now fails.

  }

  private Mono<Integer> subFlux(int i) {
    Mono<Integer> result = Mono.create(sink ->
    {
      if (i % 2 == 0) {
        try {
          Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
      }
      sink.success(i);
    });
    return result.subscribeOn(Schedulers.newBoundedElastic(2, 2, "other"));
  }