使用 application/stream+json 时 WebFlux 不发送数据
WebFlux not sending data when using application/stream+json
我有一个响应式核心 WebClient post 给定端点。负载是 Job
个对象的通量,内容类型是 application/stream+json
Flux jobFlux = Flux.just(new Job());
Mono<JsonNode> response = localEP.post().uri( "/dev/job" )
.contentType(MediaType.APPLICATION_STREAM_JSON)
.body( BodyInserters.fromObject(jobFlux))
.retrieve()
.bodyToMono( JsonNode.class );
在服务器端,我尝试了 Spring Controller 样式和 Spring Web Reactive FunctionHandler 来处理上述调用的负载,负载是 Flux。
@PostMapping(path = "/dev/job", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE)
@ResponseStatus( HttpStatus.CREATED )
public Mono<Void> loadJobs (@RequestBody Flux<Job> jobs) {
return this.repository.create(jobs);
}
域classJob
在实例化新对象时创建和id:
public Job() {
UUID guid = UUID.randomUUID();
id = guid.toString();
title = "Random String";
}
存储库当前只是一个存根:
@Repository
public class DemoJobRepository implements ReactiveRepository<Job> {
private static Logger logger = LoggerFactory.getLogger(DemoJobRepository.class);
private final List<Job> jobs = Lists.newArrayList();
@Override
public Mono<Void> create(Publisher<Job> jobStream) {
return Flux.from(jobStream).doOnNext(jobs::add).then();
}
@Override
public Flux<Job> getAll() {
return Flux.fromIterable(jobs);
}
@Override
public Mono<Job> findById(String id) {
return null;
}
}
我没有看到客户端试图发送请求体。我们在客户端上调用 block
来获取结果,我看到客户端发送了请求,但是服务器端点总是看到一个空的通量。拜托,非常感谢任何帮助。
对于反应类型,nothing happens until you subscribe - 构建反应管道不会执行它应该做的事情,只有订阅它才会启动该过程。
有多种订阅方式:
调用任何 subscribe
变体。其中许多将 lambda 作为参数,在处理完成或以错误结束时执行。使用空变体有点冒险,因为它只是启动执行,但您不会得到任何回调。从技术上讲,没有任何东西在等待它,因此 JVM 可以在处理完成之前退出
调用任何 block
/collect
方法。这不仅订阅而且 returns 期望值。
永远不要在 returns 反应类型的方法中完成这两个选择,否则这将导致您的应用程序出现严重问题。
我有一个响应式核心 WebClient post 给定端点。负载是 Job
个对象的通量,内容类型是 application/stream+json
Flux jobFlux = Flux.just(new Job());
Mono<JsonNode> response = localEP.post().uri( "/dev/job" )
.contentType(MediaType.APPLICATION_STREAM_JSON)
.body( BodyInserters.fromObject(jobFlux))
.retrieve()
.bodyToMono( JsonNode.class );
在服务器端,我尝试了 Spring Controller 样式和 Spring Web Reactive FunctionHandler 来处理上述调用的负载,负载是 Flux。
@PostMapping(path = "/dev/job", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE)
@ResponseStatus( HttpStatus.CREATED )
public Mono<Void> loadJobs (@RequestBody Flux<Job> jobs) {
return this.repository.create(jobs);
}
域classJob
在实例化新对象时创建和id:
public Job() {
UUID guid = UUID.randomUUID();
id = guid.toString();
title = "Random String";
}
存储库当前只是一个存根:
@Repository
public class DemoJobRepository implements ReactiveRepository<Job> {
private static Logger logger = LoggerFactory.getLogger(DemoJobRepository.class);
private final List<Job> jobs = Lists.newArrayList();
@Override
public Mono<Void> create(Publisher<Job> jobStream) {
return Flux.from(jobStream).doOnNext(jobs::add).then();
}
@Override
public Flux<Job> getAll() {
return Flux.fromIterable(jobs);
}
@Override
public Mono<Job> findById(String id) {
return null;
}
}
我没有看到客户端试图发送请求体。我们在客户端上调用 block
来获取结果,我看到客户端发送了请求,但是服务器端点总是看到一个空的通量。拜托,非常感谢任何帮助。
对于反应类型,nothing happens until you subscribe - 构建反应管道不会执行它应该做的事情,只有订阅它才会启动该过程。
有多种订阅方式:
调用任何
subscribe
变体。其中许多将 lambda 作为参数,在处理完成或以错误结束时执行。使用空变体有点冒险,因为它只是启动执行,但您不会得到任何回调。从技术上讲,没有任何东西在等待它,因此 JVM 可以在处理完成之前退出调用任何
block
/collect
方法。这不仅订阅而且 returns 期望值。永远不要在 returns 反应类型的方法中完成这两个选择,否则这将导致您的应用程序出现严重问题。