使用 application/stream+json 时 WebFlux 不发送数据

WebFlux not sending data when using application/stream+json

我有一个响应式核心 WebClient post 给定端点。负载是 Job 个对象的通量,内容类型是 application/stream+json Flux jobFlux = Flux.just(new Job());

Mono<JsonNode> response = localEP.post().uri( "/dev/job" )
    .contentType(MediaType.APPLICATION_STREAM_JSON)
    .body( BodyInserters.fromObject(jobFlux))
    .retrieve()
    .bodyToMono( JsonNode.class );

在服务器端,我尝试了 Spring Controller 样式和 Spring Web Reactive FunctionHandler 来处理上述调用的负载,负载是 Flux。

  @PostMapping(path = "/dev/job", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE)
  @ResponseStatus( HttpStatus.CREATED )
 public Mono<Void> loadJobs (@RequestBody Flux<Job> jobs) {
    return this.repository.create(jobs); 
 }

域classJob在实例化新对象时创建和id:

  public Job() {
    UUID guid = UUID.randomUUID();
    id = guid.toString();
    title = "Random String";
  }

存储库当前只是一个存根:

@Repository
public class DemoJobRepository implements ReactiveRepository<Job> {
   private static Logger logger = LoggerFactory.getLogger(DemoJobRepository.class);
   private final List<Job> jobs = Lists.newArrayList();

@Override
public Mono<Void> create(Publisher<Job> jobStream) {
    return Flux.from(jobStream).doOnNext(jobs::add).then();
}

@Override
public Flux<Job> getAll() {
    return Flux.fromIterable(jobs);
}

@Override
public Mono<Job> findById(String id) {
    return null;
}
}

我没有看到客户端试图发送请求体。我们在客户端上调用 block 来获取结果,我看到客户端发送了请求,但是服务器端点总是看到一个空的通量。拜托,非常感谢任何帮助。

对于反应类型,nothing happens until you subscribe - 构建反应管道不会执行它应该做的事情,只有订阅它才会启动该过程。

有多种订阅方式:

  • 调用任何 subscribe 变体。其中许多将 lambda 作为参数,在处理完成或以错误结束时执行。使用空变体有点冒险,因为它只是启动执行,但您不会得到任何回调。从技术上讲,没有任何东西在等待它,因此 JVM 可以在处理完成之前退出

  • 调用任何 block/collect 方法。这不仅订阅而且 returns 期望值。

    永远不要在 returns 反应类型的方法中完成这两个选择,否则这将导致您的应用程序出现严重问题。