从返回 Flux 的 WebClient 捕获 PrematureCloseException
Catching PrematureCloseException from WebClient returning a Flux
如何在使用org.springframework.web.reactive.function.client.WebClient
检索和无限reactor.core.publisher.Flux
时捕获reactor.netty.http.client.PrematureCloseException
?
为了重现该问题,我创建了两个简单的 Spring 启动应用程序。
两者都基于 org.springframework.boot:spring-boot-starter-webflux
.
服务器代码是:
@RestController
public class TestserverRestController {
@GetMapping(value="/huge-flux", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<Long> hugeFlux() {
return Flux.range(1, Integer.MAX_VALUE).interval(Duration.ofSeconds(1));
}
@Autowired
ConfigurableApplicationContext springContext;
@GetMapping(value="/stop")
public void stop() {
springContext.close();
}
}
客户端代码为:
@Component
public class TestclientRunner implements CommandLineRunner {
@Override
public void run(String... args) throws Exception {
Flux<Long> flux = WebClient.create().get()
.uri("http://localhost:9000/huge-flux")
.accept(MediaType.APPLICATION_STREAM_JSON)
.retrieve()
.bodyToFlux(Long.class);
flux.subscribe(val -> System.out.printf("Value %d%n", val));
}
}
通过在网络浏览器中点击 http://localhost:9000/stop
客户端
被终止,并且这个错误出现在客户端的控制台上。
WARN 15635 --- [ctor-http-nio-2] reactor.netty.channel.FluxReceive : [id: 0x82a94759, L:0.0.0.0/0.0.0.0:61812] An exception has been observed post termination, use DEBUG level to see the full stack: reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.netty.http.client.PrematureCloseException: Connection prematurely closed DURING response
我希望能够捕获该错误以便我可以恢复。 (在我真实的
项目如果一个失败,将有备用服务器连接。)
我在客户端尝试过的包括:
flux.doOnCancel(()-> log.warning("CANCEL"));
flux.doOnTerminate(()-> log.warning("TERMINATE"));
flux.doOnComplete(()-> log.warning("COMPLETE"));
flux.doOnDiscard(Object.class, (o)-> log.warning("DISCARD"));
flux.doOnError((e)-> log.warning("ERROR"));
但是 none 这些日志消息会在服务器终止时打印出来。
通过将客户端代码中的 flux.subsribe()
行替换为:
flux.subscribe(
val -> System.out.printf("Value %d%n", val),
ex -> System.err.printf("ERROR CONSUMER [%s] %s", ex.getClass(), ex.getMessage()),
() -> System.err.printf("COMPLETE CONSUMER"));
问题可以由错误消费者捕获(打印 "ERROR CONSUMER")。
如何在使用org.springframework.web.reactive.function.client.WebClient
检索和无限reactor.core.publisher.Flux
时捕获reactor.netty.http.client.PrematureCloseException
?
为了重现该问题,我创建了两个简单的 Spring 启动应用程序。
两者都基于 org.springframework.boot:spring-boot-starter-webflux
.
服务器代码是:
@RestController
public class TestserverRestController {
@GetMapping(value="/huge-flux", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<Long> hugeFlux() {
return Flux.range(1, Integer.MAX_VALUE).interval(Duration.ofSeconds(1));
}
@Autowired
ConfigurableApplicationContext springContext;
@GetMapping(value="/stop")
public void stop() {
springContext.close();
}
}
客户端代码为:
@Component
public class TestclientRunner implements CommandLineRunner {
@Override
public void run(String... args) throws Exception {
Flux<Long> flux = WebClient.create().get()
.uri("http://localhost:9000/huge-flux")
.accept(MediaType.APPLICATION_STREAM_JSON)
.retrieve()
.bodyToFlux(Long.class);
flux.subscribe(val -> System.out.printf("Value %d%n", val));
}
}
通过在网络浏览器中点击 http://localhost:9000/stop
客户端
被终止,并且这个错误出现在客户端的控制台上。
WARN 15635 --- [ctor-http-nio-2] reactor.netty.channel.FluxReceive : [id: 0x82a94759, L:0.0.0.0/0.0.0.0:61812] An exception has been observed post termination, use DEBUG level to see the full stack: reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.netty.http.client.PrematureCloseException: Connection prematurely closed DURING response
我希望能够捕获该错误以便我可以恢复。 (在我真实的 项目如果一个失败,将有备用服务器连接。)
我在客户端尝试过的包括:
flux.doOnCancel(()-> log.warning("CANCEL"));
flux.doOnTerminate(()-> log.warning("TERMINATE"));
flux.doOnComplete(()-> log.warning("COMPLETE"));
flux.doOnDiscard(Object.class, (o)-> log.warning("DISCARD"));
flux.doOnError((e)-> log.warning("ERROR"));
但是 none 这些日志消息会在服务器终止时打印出来。
通过将客户端代码中的 flux.subsribe()
行替换为:
flux.subscribe(
val -> System.out.printf("Value %d%n", val),
ex -> System.err.printf("ERROR CONSUMER [%s] %s", ex.getClass(), ex.getMessage()),
() -> System.err.printf("COMPLETE CONSUMER"));
问题可以由错误消费者捕获(打印 "ERROR CONSUMER")。