如何在 Spring Webflux / Reactor Netty Web 应用程序中执行阻塞调用
How to execute blocking calls within a Spring Webflux / Reactor Netty web application
在我有一个 Spring Webflux 微服务和 Reactor Netty 的用例中,我有以下依赖项:
org.springframework.boot.spring-boot-starter-webflux
(2.0.1.RELEASE)
org.springframework.boot.spring-boot-starter-data-mongodb-reactive
(2.0.1.RELEASE)
org.projectreactor.reactor-spring
(1.0.1.RELEASE)
对于一个非常具体的案例,我需要从我的 Mongo 数据库中检索一些信息,并将其处理为查询参数,并通过我的反应 WebClient
发送。由于 WebClient
和 UriComponentsBuilder
都接受发布者(Mono / Flux),所以我使用 #block()
调用来接收结果。
自从 reactor-core
(版本 0.7.6.RELEASE)已包含在最新的 spring-boot-dependencies
(版本 2.0.1.RELEASE)中,无法再使用: block()/blockFirst()/blockLast() are blocking, which is not supported in thread xxx
,参见 -> https://github.com/reactor/reactor-netty/issues/312
我的代码片段:
public Mono<FooBar> getFooBar(Foo foo) {
MultiValueMap<String, String> parameters = new LinkedMultiValueMap<>();
parameters.add("size", foo.getSize());
parameters.addAll("bars", barReactiveCrudRepository.findAllByIdentifierIn(foo.getBarIdentifiers()) // This obviously returns a Flux
.map(Bar::toString)
.collectList()
.block());
String url = UriComponentsBuilder.fromHttpUrl("https://base-url/")
.port(8081)
.path("/foo-bar")
.queryParams(parameters)
.build()
.toString();
return webClient.get()
.uri(url)
.retrieve()
.bodyToMono(FooBar.class);
}
这适用于 spring-boot
2.0 版。0.RELEASE,但自升级到 2.0 版后。1.RELEASE 因此从 reactor-core
升级到 0.7 版。6.RELEASE 不允许了。
我看到的唯一真正的解决方案是包括一个块(非反应性)存储库/mongo 客户端,但我不确定是否鼓励这样做。有什么建议么?
WebClient
不接受其请求 Publisher
类型 URL,但没有什么能阻止您执行以下操作:
public Mono<FooBar> getFooBar(Foo foo) {
Mono<List<String>> bars = barReactiveCrudRepository
.findAllByIdentifierIn(foo.getBarIdentifiers())
.map(Bar::toString)
.collectList();
Mono<FooBar> foobar = bars.flatMap(b -> {
MultiValueMap<String, String> parameters = new LinkedMultiValueMap<>();
parameters.add("size", foo.getSize());
parameters.addAll("bars", b);
String url = UriComponentsBuilder.fromHttpUrl("https://base-url/")
.port(8081)
.path("/foo-bar")
.queryParams(parameters)
.build()
.toString();
return webClient.get()
.uri(url)
.retrieve()
.bodyToMono(FooBar.class);
});
return foobar;
}
如果有的话,这个新的 reactor-core 检查使您免于在 WebFlux 处理程序中间使用此阻塞调用导致整个应用程序崩溃。
在我有一个 Spring Webflux 微服务和 Reactor Netty 的用例中,我有以下依赖项:
org.springframework.boot.spring-boot-starter-webflux
(2.0.1.RELEASE)org.springframework.boot.spring-boot-starter-data-mongodb-reactive
(2.0.1.RELEASE)org.projectreactor.reactor-spring
(1.0.1.RELEASE)
对于一个非常具体的案例,我需要从我的 Mongo 数据库中检索一些信息,并将其处理为查询参数,并通过我的反应 WebClient
发送。由于 WebClient
和 UriComponentsBuilder
都接受发布者(Mono / Flux),所以我使用 #block()
调用来接收结果。
自从 reactor-core
(版本 0.7.6.RELEASE)已包含在最新的 spring-boot-dependencies
(版本 2.0.1.RELEASE)中,无法再使用: block()/blockFirst()/blockLast() are blocking, which is not supported in thread xxx
,参见 -> https://github.com/reactor/reactor-netty/issues/312
我的代码片段:
public Mono<FooBar> getFooBar(Foo foo) {
MultiValueMap<String, String> parameters = new LinkedMultiValueMap<>();
parameters.add("size", foo.getSize());
parameters.addAll("bars", barReactiveCrudRepository.findAllByIdentifierIn(foo.getBarIdentifiers()) // This obviously returns a Flux
.map(Bar::toString)
.collectList()
.block());
String url = UriComponentsBuilder.fromHttpUrl("https://base-url/")
.port(8081)
.path("/foo-bar")
.queryParams(parameters)
.build()
.toString();
return webClient.get()
.uri(url)
.retrieve()
.bodyToMono(FooBar.class);
}
这适用于 spring-boot
2.0 版。0.RELEASE,但自升级到 2.0 版后。1.RELEASE 因此从 reactor-core
升级到 0.7 版。6.RELEASE 不允许了。
我看到的唯一真正的解决方案是包括一个块(非反应性)存储库/mongo 客户端,但我不确定是否鼓励这样做。有什么建议么?
WebClient
不接受其请求 Publisher
类型 URL,但没有什么能阻止您执行以下操作:
public Mono<FooBar> getFooBar(Foo foo) {
Mono<List<String>> bars = barReactiveCrudRepository
.findAllByIdentifierIn(foo.getBarIdentifiers())
.map(Bar::toString)
.collectList();
Mono<FooBar> foobar = bars.flatMap(b -> {
MultiValueMap<String, String> parameters = new LinkedMultiValueMap<>();
parameters.add("size", foo.getSize());
parameters.addAll("bars", b);
String url = UriComponentsBuilder.fromHttpUrl("https://base-url/")
.port(8081)
.path("/foo-bar")
.queryParams(parameters)
.build()
.toString();
return webClient.get()
.uri(url)
.retrieve()
.bodyToMono(FooBar.class);
});
return foobar;
}
如果有的话,这个新的 reactor-core 检查使您免于在 WebFlux 处理程序中间使用此阻塞调用导致整个应用程序崩溃。