有限源上的反应流对未来事件保持开放
Reactive stream on finite source remaining open for future events
我正在使用 Spring Webflux、Spring 数据和用于非阻塞的 Project Reactor I/O (Spring Boot 2.0.0.M7)。
我的目标是创建类似股票代码的 API 以允许客户端根据某些条件从端点请求所有资源,并接收在初始请求后创建的新资源。 Reactive MongoDB 是后备存储。基本的 HandlerFunction
实现如下所示。
Mono<ServerResponse> getFoos(ServerRequest request) {
ok().contentType(TEXT_EVENT_STREAM)
.body(fooRepository.findAll(), Foo)
}
显然,这只是 returns 当前可用的所有 Foos
,然后 Publisher
关闭连接并且没有新的 Foos
发送到客户端。我的问题是关于使用什么模式向这个可以接受新条目的无限流添加?
- 与一些全局
Publisher Bean
连接,我在创建它们时将新的 Foos
写入
- 添加一个
onComplete
重新订阅 Repository
(使用一些条件来过滤重复的条目)
- 使用
repeat
并让客户端过滤重复项
- 还有别的吗?
如果您相应地配置您的 MongoDB 集合(它必须被限制),您可以使用可尾游标来实现您想要的,只需在您的存储库上添加一个 @Tailable
注释。见 Spring Data MongoDB reference documentation about infinite streams.
我正在使用 Spring Webflux、Spring 数据和用于非阻塞的 Project Reactor I/O (Spring Boot 2.0.0.M7)。
我的目标是创建类似股票代码的 API 以允许客户端根据某些条件从端点请求所有资源,并接收在初始请求后创建的新资源。 Reactive MongoDB 是后备存储。基本的 HandlerFunction
实现如下所示。
Mono<ServerResponse> getFoos(ServerRequest request) {
ok().contentType(TEXT_EVENT_STREAM)
.body(fooRepository.findAll(), Foo)
}
显然,这只是 returns 当前可用的所有 Foos
,然后 Publisher
关闭连接并且没有新的 Foos
发送到客户端。我的问题是关于使用什么模式向这个可以接受新条目的无限流添加?
- 与一些全局
Publisher Bean
连接,我在创建它们时将新的Foos
写入 - 添加一个
onComplete
重新订阅Repository
(使用一些条件来过滤重复的条目) - 使用
repeat
并让客户端过滤重复项 - 还有别的吗?
如果您相应地配置您的 MongoDB 集合(它必须被限制),您可以使用可尾游标来实现您想要的,只需在您的存储库上添加一个 @Tailable
注释。见 Spring Data MongoDB reference documentation about infinite streams.