有限源上的反应流对未来事件保持开放

Reactive stream on finite source remaining open for future events

我正在使用 Spring Webflux、Spring 数据和用于非阻塞的 Project Reactor I/O (Spring Boot 2.0.0.M7)。

我的目标是创建类似股票代码的 API 以允许客户端根据某些条件从端点请求所有资源,并接收在初始请求后创建的新资源。 Reactive MongoDB 是后备存储。基本的 HandlerFunction 实现如下所示。

Mono<ServerResponse> getFoos(ServerRequest request) {
    ok().contentType(TEXT_EVENT_STREAM)
            .body(fooRepository.findAll(), Foo)
}

显然,这只是 returns 当前可用的所有 Foos,然后 Publisher 关闭连接并且没有新的 Foos 发送到客户端。我的问题是关于使用什么模式向这个可以接受新条目的无限流添加?

  1. 与一些全局 Publisher Bean 连接,我在创建它们时将新的 Foos 写入
  2. 添加一个 onComplete 重新订阅 Repository(使用一些条件来过滤重复的条目)
  3. 使用repeat并让客户端过滤重复项
  4. 还有别的吗?

如果您相应地配置您的 MongoDB 集合(它必须被限制),您可以使用可尾游标来实现您想要的,只需在您的存储库上添加一个 @Tailable 注释。见 Spring Data MongoDB reference documentation about infinite streams.