在 Spring 引导客户端中接收 Flux
Receiving a Flux in a Spring Boot Client
这是
的后续问题
我尝试遵循如何使用 Spring WebClient 接收 Flux 的建议,但实际上遇到了一个问题。
在服务器端,代码是一个简单的控制器,公开了 Mongo 存储库的 findAll 方法:
@RequestMapping("power")
public Flux<Power> getAll() {
return powerRepository.findAll();
}
客户端消费代码如上答案:
@Bean
public CommandLineRunner readFromApiServer() {
return new CommandLineRunner() {
@Override
public void run(String... args) throws Exception {
WebClient webClient = WebClient.create("http://localhost:8080");
Flux<Power> flux = webClient.get().uri("/power").accept(MediaType.APPLICATION_STREAM_JSON).exchange()
.flatMap(response -> response.bodyToFlux(Power.class));
flux.subscribe();
}
};
}
但这会引发异常:
2017-02-27 08:19:41.281 ERROR 99026 --- [ctor-http-nio-5]
r.ipc.netty.channel.ChannelOperations : [HttpClient] Error
processing connection. Requesting close the channel
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at
io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:101)
~[netty-all-4.1.8.Final.jar:4.1.8.Final]
我正在使用当前的 Spring Boot 2.0.0 BUILD-SNAPSHOT。
这个异常告诉我什么?我怎样才能正确?
所有 CommandLineRunner
bean 在 Spring 引导应用程序启动时执行。如果没有守护线程(即当前应用程序不是 Web 应用程序),应用程序将在所有运行程序执行完毕后关闭。
在您的情况下,仅使用 flux.subscribe()
"Start the chain and request unbounded demand" (javadoc),因此此方法调用不会阻塞。我怀疑这个命令行运行器在你有机会用你的助焊剂做任何事情之前就回来了,应用程序关闭并且你的网络资源被关闭。
此外,您没有对 HTTP 请求的结果执行任何操作。
我认为使用以下内容更新您的代码示例应该可以解决问题:
@Bean
public CommandLineRunner readFromApiServer() {
return new CommandLineRunner() {
@Override
public void run(String... args) throws Exception {
WebClient webClient = WebClient.create("http://localhost:8080");
Flux<Power> flux = webClient.get().uri("/power").accept(MediaType.APPLICATION_STREAM_JSON).exchange()
.flatMap(response -> response.bodyToFlux(Power.class));
List<Power> powerList = flux.collectList().block();
// do something with the result?
}
};
}
这是
我尝试遵循如何使用 Spring WebClient 接收 Flux 的建议,但实际上遇到了一个问题。
在服务器端,代码是一个简单的控制器,公开了 Mongo 存储库的 findAll 方法:
@RequestMapping("power")
public Flux<Power> getAll() {
return powerRepository.findAll();
}
客户端消费代码如上答案:
@Bean
public CommandLineRunner readFromApiServer() {
return new CommandLineRunner() {
@Override
public void run(String... args) throws Exception {
WebClient webClient = WebClient.create("http://localhost:8080");
Flux<Power> flux = webClient.get().uri("/power").accept(MediaType.APPLICATION_STREAM_JSON).exchange()
.flatMap(response -> response.bodyToFlux(Power.class));
flux.subscribe();
}
};
}
但这会引发异常:
2017-02-27 08:19:41.281 ERROR 99026 --- [ctor-http-nio-5] r.ipc.netty.channel.ChannelOperations : [HttpClient] Error processing connection. Requesting close the channel
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1 at io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:101) ~[netty-all-4.1.8.Final.jar:4.1.8.Final]
我正在使用当前的 Spring Boot 2.0.0 BUILD-SNAPSHOT。
这个异常告诉我什么?我怎样才能正确?
所有 CommandLineRunner
bean 在 Spring 引导应用程序启动时执行。如果没有守护线程(即当前应用程序不是 Web 应用程序),应用程序将在所有运行程序执行完毕后关闭。
在您的情况下,仅使用 flux.subscribe()
"Start the chain and request unbounded demand" (javadoc),因此此方法调用不会阻塞。我怀疑这个命令行运行器在你有机会用你的助焊剂做任何事情之前就回来了,应用程序关闭并且你的网络资源被关闭。
此外,您没有对 HTTP 请求的结果执行任何操作。 我认为使用以下内容更新您的代码示例应该可以解决问题:
@Bean
public CommandLineRunner readFromApiServer() {
return new CommandLineRunner() {
@Override
public void run(String... args) throws Exception {
WebClient webClient = WebClient.create("http://localhost:8080");
Flux<Power> flux = webClient.get().uri("/power").accept(MediaType.APPLICATION_STREAM_JSON).exchange()
.flatMap(response -> response.bodyToFlux(Power.class));
List<Power> powerList = flux.collectList().block();
// do something with the result?
}
};
}