Reactor - 理解 .flatMap() 中的线程池

Reactor - understanding thread pools in .flatMap()

我试图了解响应式编程的真正工作原理。我为此准备了简单的演示:来自 Spring Framework 的 reactive WebClient 将请求发送到简单的 rest api 并且此客户端在每个操作中打印线程名称。

休息api:

@RestController
@SpringBootApplication
public class RestApiApplication {

    public static void main(String[] args) {
        SpringApplication.run(RestApiApplication.class, args);
    }

    @PostMapping("/resource")
    public void consumeResource(@RequestBody Resource resource) {
        System.out.println(String.format("consumed resource: %s", resource.toString()));
    }
}

@Data
@AllArgsConstructor
class Resource {
    private final Long id;
    private final String name;
}

以及最重要的 - 响应式 Web 客户端:

@SpringBootApplication
public class ReactorWebclientApplication {

    public static void main(String[] args) {
        SpringApplication.run(ReactorWebclientApplication.class, args);
    }

    private final TcpClient tcpClient = TcpClient.create();

    private final WebClient webClient = WebClient.builder()
        .clientConnector(new ReactorClientHttpConnector(HttpClient.from(tcpClient)))
        .baseUrl("http://localhost:8080")
        .build();

    @PostConstruct
    void doRequests() {
        var longs = LongStream.range(1L, 10_000L)
            .boxed()
            .toArray(Long[]::new);

        var longsStream = Stream.of(longs);

        Flux.fromStream(longsStream)
            .map(l -> {
                System.out.println(String.format("------- map [%s] --------", Thread.currentThread().getName()));
                return new Resource(l, String.format("name %s", l));
            })
            .filter(res -> {
                System.out.println(String.format("------- filter [%s] --------", Thread.currentThread().getName()));
                return !res.getId().equals(11_000L);
            })
            .flatMap(res -> {
                System.out.println(String.format("------- flatmap [%s] --------", Thread.currentThread().getName()));
                return webClient.post()
                    .uri("/resource")
                    .syncBody(res)
                    .header("Content-Type", "application/json")
                    .header("Accept", "application/json")
                    .retrieve()
                    .bodyToMono(Resource.class)
                    .doOnSuccess(ignore -> System.out.println(String.format("------- onsuccess [%s] --------", Thread.currentThread().getName())))
                    .doOnError(ignore -> System.out.println(String.format("------- onerror [%s] --------", Thread.currentThread().getName())));
            })
            .blockLast();
    }

}

@JsonIgnoreProperties(ignoreUnknown = true)
class Resource {
    private final Long id;
    private final String name;

    @JsonCreator
    Resource(@JsonProperty("id") Long id, @JsonProperty("name")  String name) {
        this.id = id;
        this.name = name;
    }

    Long getId() {
        return id;
    }

    String getName() {
        return name;
    }

    @Override
    public String toString() {
        final StringBuilder sb = new StringBuilder("Resource{");
        sb.append("id=").append(id);
        sb.append(", name='").append(name).append('\'');
        sb.append('}');
        return sb.toString();
    }
}

问题是行为与我预期的不同。

我预计 .map().filter().flatMap() 的每次调用都将在 main 线程上执行,而 .doOnSuccess() 或 [= 的每次调用20=] 将在 nio 线程池中的线程上执行。所以我希望日志看起来像:

------- map [main] --------
------- filter [main] --------
------- flatmap [main] --------
(and so on...)
------- onsuccess [reactor-http-nio-2] --------
(and so on...)

但我得到的日志是:

------- map [main] --------
------- filter [main] --------
------- flatmap [main] --------
------- map [main] --------
------- filter [main] --------
------- flatmap [main] --------
------- onsuccess [reactor-http-nio-2] --------
------- onsuccess [reactor-http-nio-6] --------
------- onsuccess [reactor-http-nio-4] --------
------- onsuccess [reactor-http-nio-8] --------
------- map [reactor-http-nio-2] --------
------- filter [reactor-http-nio-2] --------
------- flatmap [reactor-http-nio-2] --------
------- map [reactor-http-nio-2] --------

.map().filter().flatMap() 中的每个下一次登录都是在来自 reactor-http-nio 的线程上完成的。

下一个难以理解的事实是在主线程和reactor-http-nio上执行的操作之间的比例总是不同的。有时所有操作 .map().filter().flatMap() 都在主线程上执行。

Reactor 和 RxJava 一样,可以被认为是并发不可知的。也就是说,它不强制执行并发模型。相反,它让您(开发人员)掌握一切。但是,这并不妨碍库帮助您处理并发。

获得FluxMono并不一定意味着它运行在专用线程中。相反,大多数运算符继续在前一个运算符执行的线程中工作。除非指定,否则最顶层的运算符(源)本身在进行 subscribe() 调用的线程上运行。

可以找到 Project Reactor 相关文档 here

根据您的代码,以下片段:

webClient.post()
         .uri("/resource")
         .syncBody(res)
         .header("Content-Type", "application/json")
         .header("Accept", "application/json")
         .retrieve()
         .bodyToMono(Resource.class)

导致线程从 main 切换到 netty 的工作池 。之后,以下所有操作均由netty工作线程执行。

如果你想控制这个行为,你应该在你的代码中添加一个publishOn(...)语句,例如:

webClient.post()
         .uri("/resource")
         .syncBody(res)
         .header("Content-Type", "application/json")
         .header("Accept", "application/json")
         .retrieve()
         .bodyToMono(Resource.class)
         .publishOn(Schedulers.elastic())

这样,弹性调度程序线程池将执行以下任何操作。

另一个示例是使用专用调度程序处理 HTTP 请求执行后的繁重任务。

import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;

import com.github.tomakehurst.wiremock.WireMockServer;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import ru.lanwen.wiremock.ext.WiremockResolver;
import ru.lanwen.wiremock.ext.WiremockResolver.Wiremock;
import ru.lanwen.wiremock.ext.WiremockUriResolver;
import ru.lanwen.wiremock.ext.WiremockUriResolver.WiremockUri;

@ExtendWith({
  WiremockResolver.class,
  WiremockUriResolver.class
})
public class ReactiveThreadsControlTest {

  private static int concurrency = 1;

  private final WebClient webClient = WebClient.create();

  @Test
  public void slowServerResponsesTest(@Wiremock WireMockServer server, @WiremockUri String uri) {

    String requestUri = "/slow-response";

    server.stubFor(get(urlEqualTo(requestUri))
      .willReturn(aResponse().withStatus(200)
        .withFixedDelay((int) TimeUnit.SECONDS.toMillis(2)))
    );

    Flux
      .generate(() -> Integer.valueOf(1), (i, sink) -> {
        System.out.println(String.format("[%s] Emitting next value: %d", Thread.currentThread().getName(), i));
        sink.next(i);
        return i + 1;
      })
      .subscribeOn(Schedulers.single())
      .flatMap(i ->
          executeGet(uri + requestUri)
            .publishOn(Schedulers.elastic())
            .map(response -> {
              heavyTask();
              return true;
            })
        , concurrency)
      .subscribe();

    blockForever();
  }

  private void blockForever() {
    Object monitor = new Object();

    synchronized (monitor) {
      try {
        monitor.wait();
      } catch (InterruptedException ex) {
      }
    }
  }


  private Mono<ClientResponse> executeGet(String path) {
    System.out.println(String.format("[%s] About to execute an HTTP GET request: %s", Thread.currentThread().getName(), path));
    return webClient
      .get()
      .uri(path)
      .exchange();
  }

  private void heavyTask() {
    try {
      System.out.println(String.format("[%s] About to execute a heavy task", Thread.currentThread().getName()));
      Thread.sleep(TimeUnit.SECONDS.toMillis(20));
    } catch (InterruptedException ex) {
    }
  }
}