Reactor Flux 和异步处理
Reactor Flux and asynchronous processing
我正在尝试学习 Reactor,但遇到了很多麻烦。我想做一个非常简单的概念证明,我模拟调用慢速流服务 1 次或多次。如果您使用反应器并流式传输响应,则调用者不必等待所有结果。
所以我创建了一个非常简单的控制器,但它的行为并不像我预期的那样。当延迟在我的 flatMap“内部”(在我调用的方法内部)时,直到一切都完成后才会返回响应。但是当我在 flatMap 之后添加延迟时,数据被流式传输。
为什么这段代码会导致 JSON
的流
@GetMapping(value = "/test", produces = { MediaType.APPLICATION_STREAM_JSON_VALUE })
Flux<HashMap<String, Object>> customerCards(@PathVariable String customerId) {
Integer count = service.getCount(customerId);
return Flux.range(1, count).
flatMap(k -> service.doRestCall(k)).delayElements(Duration.ofMillis(5000));
}
但这并不
@GetMapping(value = "/test2", produces = { MediaType.APPLICATION_STREAM_JSON_VALUE })
Flux<HashMap<String, Object>> customerCards(@PathVariable String customerId) {
Integer count = service.getCount(customerId);
return Flux.range(1, count).
flatMap(k -> service.doRestCallWithDelay(k));
}
它认为我缺少一些非常基本的反应堆 API。在那张纸条上。谁能指出一本关于反应堆的好书或教程?我好像找不到什么好的东西学这个。
谢谢
这里是项目反应器reference guide
"delayElements" 方法仅将通量元素延迟给定的持续时间,请参阅 javadoc for more details
如果您需要更多帮助,我认为您应该 post 有关方法 "service.doRestCallWithDelay(k)" 和 "service.doRestCall(k)" 的详细信息。
flatMap
中的代码在主线程(即控制器运行的线程)上运行。结果整个过程被阻塞,方法不会立即 return。请记住,Reactor 不会强加特定的线程模型。
相反,根据文档,在 delayElements
方法中 signals are delayed and continue on the parallel default Scheduler
。这意味着主线程没有被阻塞并且 return 立即。
这里有两个对应的例子:
阻止代码:
Flux.range(1, 500)
.map(i -> {
//blocking code
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " - Item : " + i);
return i;
})
.subscribe();
System.out.println("main completed");
结果:
main - Item : 1
main - Item : 2
main - Item : 3
...
main - Item : 500
main completed
Non-blocking代码:
Flux.range(1, 500)
.delayElements(Duration.ofSeconds(1))
.subscribe(i -> {
System.out.println(Thread.currentThread().getName() + " - Item : " + i);
});
System.out.println("main Completed");
//sleep main thread in order to be able to print the println of the flux
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
结果:
main Completed
parallel-1 - Item : 1
parallel-2 - Item : 2
parallel-3 - Item : 3
parallel-4 - Item : 4
...
我正在尝试学习 Reactor,但遇到了很多麻烦。我想做一个非常简单的概念证明,我模拟调用慢速流服务 1 次或多次。如果您使用反应器并流式传输响应,则调用者不必等待所有结果。
所以我创建了一个非常简单的控制器,但它的行为并不像我预期的那样。当延迟在我的 flatMap“内部”(在我调用的方法内部)时,直到一切都完成后才会返回响应。但是当我在 flatMap 之后添加延迟时,数据被流式传输。
为什么这段代码会导致 JSON
的流 @GetMapping(value = "/test", produces = { MediaType.APPLICATION_STREAM_JSON_VALUE })
Flux<HashMap<String, Object>> customerCards(@PathVariable String customerId) {
Integer count = service.getCount(customerId);
return Flux.range(1, count).
flatMap(k -> service.doRestCall(k)).delayElements(Duration.ofMillis(5000));
}
但这并不
@GetMapping(value = "/test2", produces = { MediaType.APPLICATION_STREAM_JSON_VALUE })
Flux<HashMap<String, Object>> customerCards(@PathVariable String customerId) {
Integer count = service.getCount(customerId);
return Flux.range(1, count).
flatMap(k -> service.doRestCallWithDelay(k));
}
它认为我缺少一些非常基本的反应堆 API。在那张纸条上。谁能指出一本关于反应堆的好书或教程?我好像找不到什么好的东西学这个。
谢谢
这里是项目反应器reference guide "delayElements" 方法仅将通量元素延迟给定的持续时间,请参阅 javadoc for more details 如果您需要更多帮助,我认为您应该 post 有关方法 "service.doRestCallWithDelay(k)" 和 "service.doRestCall(k)" 的详细信息。
flatMap
中的代码在主线程(即控制器运行的线程)上运行。结果整个过程被阻塞,方法不会立即 return。请记住,Reactor 不会强加特定的线程模型。
相反,根据文档,在 delayElements
方法中 signals are delayed and continue on the parallel default Scheduler
。这意味着主线程没有被阻塞并且 return 立即。
这里有两个对应的例子:
阻止代码:
Flux.range(1, 500)
.map(i -> {
//blocking code
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " - Item : " + i);
return i;
})
.subscribe();
System.out.println("main completed");
结果:
main - Item : 1
main - Item : 2
main - Item : 3
...
main - Item : 500
main completed
Non-blocking代码:
Flux.range(1, 500)
.delayElements(Duration.ofSeconds(1))
.subscribe(i -> {
System.out.println(Thread.currentThread().getName() + " - Item : " + i);
});
System.out.println("main Completed");
//sleep main thread in order to be able to print the println of the flux
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
结果:
main Completed
parallel-1 - Item : 1
parallel-2 - Item : 2
parallel-3 - Item : 3
parallel-4 - Item : 4
...