Reactor Flux 和异步处理

Reactor Flux and asynchronous processing

我正在尝试学习 Reactor,但遇到了很多麻烦。我想做一个非常简单的概念证明,我模拟调用慢速流服务 1 次或多次。如果您使用反应器并流式传输响应,则调用者不必等待所有结果。

所以我创建了一个非常简单的控制器,但它的行为并不像我预期的那样。当延迟在我的 flatMap“内部”(在我调用的方法内部)时,直到一切都完成后才会返回响应。但是当我在 flatMap 之后添加延迟时,数据被流式传输。

为什么这段代码会导致 JSON

的流
    @GetMapping(value = "/test", produces = { MediaType.APPLICATION_STREAM_JSON_VALUE })
    Flux<HashMap<String, Object>> customerCards(@PathVariable String customerId) {
        Integer count = service.getCount(customerId);

        return Flux.range(1, count).
                flatMap(k -> service.doRestCall(k)).delayElements(Duration.ofMillis(5000));

    }

但这并不

    @GetMapping(value = "/test2", produces = { MediaType.APPLICATION_STREAM_JSON_VALUE })
    Flux<HashMap<String, Object>> customerCards(@PathVariable String customerId) {
        Integer count = service.getCount(customerId);

        return Flux.range(1, count).
                flatMap(k -> service.doRestCallWithDelay(k));

    }

它认为我缺少一些非常基本的反应堆 API。在那张纸条上。谁能指出一本关于反应堆的好书或教程?我好像找不到什么好的东西学这个。

谢谢

这里是项目反应器reference guide "delayElements" 方法仅将通量元素延迟给定的持续时间,请参阅 javadoc for more details 如果您需要更多帮助,我认为您应该 post 有关方法 "service.doRestCallWithDelay(k)" 和 "service.doRestCall(k)" 的详细信息。

flatMap 中的代码在主线程(即控制器运行的线程)上运行。结果整个过程被阻塞,方法不会立即 return。请记住,Reactor 不会强加特定的线程模型。

相反,根据文档,在 delayElements 方法中 signals are delayed and continue on the parallel default Scheduler。这意味着主线程没有被阻塞并且 return 立即。

这里有两个对应的例子:

阻止代码:

Flux.range(1, 500)
    .map(i -> {
            //blocking code
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " - Item : " + i);
            return i;
    })
   .subscribe();
    
    System.out.println("main completed");

结果:

main - Item : 1
main - Item : 2
main - Item : 3
...
main - Item : 500
main completed

Non-blocking代码:

Flux.range(1, 500)
    .delayElements(Duration.ofSeconds(1))
    .subscribe(i -> {
        System.out.println(Thread.currentThread().getName() + " - Item : " + i);
    });
    
System.out.println("main Completed");
    
//sleep main thread in order to be able to print the println of the flux
try {
    Thread.sleep(30000);
} catch (InterruptedException e) {
    e.printStackTrace();
}

结果:

main Completed
parallel-1 - Item : 1
parallel-2 - Item : 2
parallel-3 - Item : 3
parallel-4 - Item : 4
...