返回 Mono<ServerResponse> 会导致(邪恶的)同步阻塞 client/server 通信吗?
Will returning a Mono<ServerResponse> result in (evil) synchronous, blocking client/server comms?
我是 Spring Reactor 和 WebFlux 的新手,对 Spring 功能性 Web 中的事件流有点困惑。
示例:我有一个处理函数 returning a Mono<ServerResponse>
。在其中,一个 findAll()
存储库方法被执行 returning a Flux<T>
。根据响应式宣言,为了异步、非阻塞和允许背压,我希望从存储库中为每个元素 return 看到一个 onNext()
。但是,在请求处理期间查看服务器日志,我只看到一个 onNext()
事件,这是有道理的,因为我的 return 类型是一个包含响应的 Mono
:
路由器功能
@Bean
public RouterFunction<ServerResponse> itemsRoute(ItemsHandler itemsHandler) {
return RouterFunctions
.route(GET(ITEMS_ENDPOINT_V2).and(accept(MediaType.APPLICATION_JSON))
, itemsHandler::getAll);
}
处理函数
Mono<ServerResponse> getAll(ServerRequest request) {
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(itemRepository.findAll(), Item.class)
.log("GET items");
}
事件日志
2020-05-10 15:10:51.744 INFO 19096 --- [ctor-http-nio-4] GET items : | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
2020-05-10 15:10:51.744 INFO 19096 --- [ctor-http-nio-4] GET items : | request(unbounded)
2020-05-10 15:10:51.744 INFO 19096 --- [ctor-http-nio-4] GET items : | onNext(org.springframework.web.reactive.function.server.DefaultEntityResponseBuilder$DefaultEntityResponse@83426cc)
2020-05-10 15:10:51.745 INFO 19096 --- [ctor-http-nio-4] GET items : | onComplete()
相比之下,使用 Flux<T>
作为 return 类型实现经典的 Spring 注释控制器方法,我会看到 onNext()
的每个实例25=](即结果集中的每一项),对我来说看起来更 "correct"(客户端现在可以控制事件流等):
控制器
@GetMapping(ITEMS_ENDPOINT_V1)
public Flux<Item> getAll() {
return itemRepository
.findAll()
.log("GET items");
}
日志
2020-05-10 15:14:04.135 INFO 19096 --- [ctor-http-nio-5] GET items : onSubscribe(FluxOnErrorResume.ResumeSubscriber)
2020-05-10 15:14:04.136 INFO 19096 --- [ctor-http-nio-5] GET items : request(unbounded)
2020-05-10 15:14:04.137 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1a, description=An item, price=4999.0))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1c, description=Another item, price=7249.99))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1b, description=Yet another item, price=2399.99))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1d, description=And another item, price=699.99))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1e, description=Aaaaaand another one, price=1.99))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onComplete()
这令人困惑。让我详细说明一下:
- 使用
Mono<ServerResponse>
似乎是邪恶的,因为它将整个结果集封装在一个事件中,对我来说,这感觉就像打破了异步、非阻塞、支持背压的事件流的反应原则。这不会剥夺客户的控制权吗?对我来说,这看起来像是传统的、阻塞的、client/server 通信。
- 直接返回
Flux<T>
感觉好多了,因为它启用了每个结果的事件处理和背压控制。
我的问题是:
- 创建
Mono<ServerResponse>
有什么影响?这是否会导致阻塞、同步交互,仅当从存储库中读取所有项目时才发出 onNext()
?我会失去背压功能等吗?
- 如何让功能样式后端为结果集中的每个项目发送
onNext()
?
- 就完全响应式(即非阻塞、异步和反压兼容)的 return 类型的函数式处理函数而言,最佳实践是什么?我不确定
Mono<ServerResponse>
是否没有违反这些反应原则。
我可能完全错了或者遗漏了一些重要的东西。感谢您的帮助!
这完全取决于使用 ServerResponse
的客户端。根据 WebFlux 文档 (https://docs.spring.io/spring-framework/docs/5.2.x/spring-framework-reference/web-reactive.html#spring-webflux) 将处理程序函数设置为 return Mono<ServerResponse>
而不管 returned 项目的数量是 标准方式并且绝对fine - 只要客户端正确处理底层 Flux<T>
一切都很好。我的问题出现是因为我使用 curl
测试了端点,它无法检测到底层 Flux
。使用启用功能样式的客户端(如 org.springframework.web.reactive.function.client.WebClient
),Mono<ServerResponse>
可以首先反序列化为 Flux<T>
,启用所有不错的反应功能,并使我们的 onNext()
事件出现。
客户代码
像这样调用后端,将 ServerResponse 反序列化为 Flux:
@GetMapping(CLIENT_ITEMS_RESOURCE_ENDPOINT_URL)
public Flux<Item> getAllItems(@RequestParam(defaultValue = "true") boolean useRetrieve) {
return webClient.get().uri(SERVER_ITEMS_RESOURCE_V2_ENDPOINT_URL)
.retrieve()
.bodyToFlux(Item.class) // <-- de-serialize the ServerResponse into a Flux
.log("GET all items from server");
}
将导致查看所有 onNext()
事件,启用客户端事件处理:
2020-05-10 16:10:10.504 INFO 10000 --- [ctor-http-nio-2] GET all items from server : onSubscribe(MonoFlatMapMany.FlatMapManyMain)
2020-05-10 16:10:10.504 INFO 10000 --- [ctor-http-nio-2] GET all items from server : request(unbounded)
2020-05-10 16:10:10.511 INFO 10000 --- [ctor-http-nio-8] GET all items from server : onNext(Item(id=5eb7f9461a10790e4902ac1a, description=bla bla, price=4999.0))
2020-05-10 16:10:10.512 INFO 10000 --- [ctor-http-nio-8] GET all items from server : onNext(Item(id=5eb7f9461a10790e4902ac1c, description=bla bla bla, price=7249.99))
2020-05-10 16:10:10.512 INFO 10000 --- [ctor-http-nio-8] GET all items from server : onNext(Item(id=5eb7f9461a10790e4902ac1b, description=bla bla bla bla, price=2399.99))
2020-05-10 16:10:10.512 INFO 10000 --- [ctor-http-nio-8] GET all items from server : onNext(Item(id=5eb7f9461a10790e4902ac1d, description=bla bla bla bla bla, price=699.99))
2020-05-10 16:10:10.512 INFO 10000 --- [ctor-http-nio-8] GET all items from server : onNext(Item(id=5eb7f9461a10790e4902ac1e, description=another item, price=1.99))
2020-05-10 16:10:10.513 INFO 10000 --- [ctor-http-nio-8] GET all items from server : onComplete()
因此,只要客户端对响应进行适当的处理,一切都很好且完全反应。
我是 Spring Reactor 和 WebFlux 的新手,对 Spring 功能性 Web 中的事件流有点困惑。
示例:我有一个处理函数 returning a Mono<ServerResponse>
。在其中,一个 findAll()
存储库方法被执行 returning a Flux<T>
。根据响应式宣言,为了异步、非阻塞和允许背压,我希望从存储库中为每个元素 return 看到一个 onNext()
。但是,在请求处理期间查看服务器日志,我只看到一个 onNext()
事件,这是有道理的,因为我的 return 类型是一个包含响应的 Mono
:
路由器功能
@Bean
public RouterFunction<ServerResponse> itemsRoute(ItemsHandler itemsHandler) {
return RouterFunctions
.route(GET(ITEMS_ENDPOINT_V2).and(accept(MediaType.APPLICATION_JSON))
, itemsHandler::getAll);
}
处理函数
Mono<ServerResponse> getAll(ServerRequest request) {
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(itemRepository.findAll(), Item.class)
.log("GET items");
}
事件日志
2020-05-10 15:10:51.744 INFO 19096 --- [ctor-http-nio-4] GET items : | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
2020-05-10 15:10:51.744 INFO 19096 --- [ctor-http-nio-4] GET items : | request(unbounded)
2020-05-10 15:10:51.744 INFO 19096 --- [ctor-http-nio-4] GET items : | onNext(org.springframework.web.reactive.function.server.DefaultEntityResponseBuilder$DefaultEntityResponse@83426cc)
2020-05-10 15:10:51.745 INFO 19096 --- [ctor-http-nio-4] GET items : | onComplete()
相比之下,使用 Flux<T>
作为 return 类型实现经典的 Spring 注释控制器方法,我会看到 onNext()
的每个实例25=](即结果集中的每一项),对我来说看起来更 "correct"(客户端现在可以控制事件流等):
控制器
@GetMapping(ITEMS_ENDPOINT_V1)
public Flux<Item> getAll() {
return itemRepository
.findAll()
.log("GET items");
}
日志
2020-05-10 15:14:04.135 INFO 19096 --- [ctor-http-nio-5] GET items : onSubscribe(FluxOnErrorResume.ResumeSubscriber)
2020-05-10 15:14:04.136 INFO 19096 --- [ctor-http-nio-5] GET items : request(unbounded)
2020-05-10 15:14:04.137 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1a, description=An item, price=4999.0))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1c, description=Another item, price=7249.99))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1b, description=Yet another item, price=2399.99))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1d, description=And another item, price=699.99))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1e, description=Aaaaaand another one, price=1.99))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onComplete()
这令人困惑。让我详细说明一下:
- 使用
Mono<ServerResponse>
似乎是邪恶的,因为它将整个结果集封装在一个事件中,对我来说,这感觉就像打破了异步、非阻塞、支持背压的事件流的反应原则。这不会剥夺客户的控制权吗?对我来说,这看起来像是传统的、阻塞的、client/server 通信。 - 直接返回
Flux<T>
感觉好多了,因为它启用了每个结果的事件处理和背压控制。
我的问题是:
- 创建
Mono<ServerResponse>
有什么影响?这是否会导致阻塞、同步交互,仅当从存储库中读取所有项目时才发出onNext()
?我会失去背压功能等吗? - 如何让功能样式后端为结果集中的每个项目发送
onNext()
? - 就完全响应式(即非阻塞、异步和反压兼容)的 return 类型的函数式处理函数而言,最佳实践是什么?我不确定
Mono<ServerResponse>
是否没有违反这些反应原则。
我可能完全错了或者遗漏了一些重要的东西。感谢您的帮助!
这完全取决于使用 ServerResponse
的客户端。根据 WebFlux 文档 (https://docs.spring.io/spring-framework/docs/5.2.x/spring-framework-reference/web-reactive.html#spring-webflux) 将处理程序函数设置为 return Mono<ServerResponse>
而不管 returned 项目的数量是 标准方式并且绝对fine - 只要客户端正确处理底层 Flux<T>
一切都很好。我的问题出现是因为我使用 curl
测试了端点,它无法检测到底层 Flux
。使用启用功能样式的客户端(如 org.springframework.web.reactive.function.client.WebClient
),Mono<ServerResponse>
可以首先反序列化为 Flux<T>
,启用所有不错的反应功能,并使我们的 onNext()
事件出现。
客户代码
像这样调用后端,将 ServerResponse 反序列化为 Flux:
@GetMapping(CLIENT_ITEMS_RESOURCE_ENDPOINT_URL)
public Flux<Item> getAllItems(@RequestParam(defaultValue = "true") boolean useRetrieve) {
return webClient.get().uri(SERVER_ITEMS_RESOURCE_V2_ENDPOINT_URL)
.retrieve()
.bodyToFlux(Item.class) // <-- de-serialize the ServerResponse into a Flux
.log("GET all items from server");
}
将导致查看所有 onNext()
事件,启用客户端事件处理:
2020-05-10 16:10:10.504 INFO 10000 --- [ctor-http-nio-2] GET all items from server : onSubscribe(MonoFlatMapMany.FlatMapManyMain)
2020-05-10 16:10:10.504 INFO 10000 --- [ctor-http-nio-2] GET all items from server : request(unbounded)
2020-05-10 16:10:10.511 INFO 10000 --- [ctor-http-nio-8] GET all items from server : onNext(Item(id=5eb7f9461a10790e4902ac1a, description=bla bla, price=4999.0))
2020-05-10 16:10:10.512 INFO 10000 --- [ctor-http-nio-8] GET all items from server : onNext(Item(id=5eb7f9461a10790e4902ac1c, description=bla bla bla, price=7249.99))
2020-05-10 16:10:10.512 INFO 10000 --- [ctor-http-nio-8] GET all items from server : onNext(Item(id=5eb7f9461a10790e4902ac1b, description=bla bla bla bla, price=2399.99))
2020-05-10 16:10:10.512 INFO 10000 --- [ctor-http-nio-8] GET all items from server : onNext(Item(id=5eb7f9461a10790e4902ac1d, description=bla bla bla bla bla, price=699.99))
2020-05-10 16:10:10.512 INFO 10000 --- [ctor-http-nio-8] GET all items from server : onNext(Item(id=5eb7f9461a10790e4902ac1e, description=another item, price=1.99))
2020-05-10 16:10:10.513 INFO 10000 --- [ctor-http-nio-8] GET all items from server : onComplete()
因此,只要客户端对响应进行适当的处理,一切都很好且完全反应。