Spring 带有 ReactiveMongoRepository 的 WebFlux:未通过流获取数据库更新

Spring WebFlux with ReactiveMongoRepository: not getting database updates through the stream

我面临的问题:

无论我尝试使用 Spring Reactive (WebFlux) REST API 的各种教程,我都无法让它工作。当我最初调用我的端点时,我能够从 MongoDB 集合中获取结果。但是,每当我更新文档条目或添加新文档时,它都不会通过文本事件流进行更新。每次我都必须再次调用端点以获得新结果。

设置:

目前我有以下设置:

我正在使用 Spring Webflux、Spring 云网关和 Spring ReactiveMongoRepository。

Spring 引导主服务的依赖项包括 pom.xml:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-security</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.security</groupId>
            <artifactId>spring-security-oauth2-jose</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
        </dependency>

ReactiveMongoRepository 的代码:

@Repository
public interface TestRepository extends ReactiveMongoRepository<TestIntegration, String> {

    @Query(("{'userId': ?0}"))
    Flux<TestIntegration> findbyUserId(String userId);
}

其余Controller的代码:

@GetMapping(value = "main/integrations", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<TestIntegration> retrieveIntegrations(ServerWebExchange exchange) {
        return testRepository.findAll();
}

根据我读过的所有 tutorial/guideline,这应该是按照这种方法工作的。 有没有人也经历过这种情况或者可以帮助解决这个问题?目前坚持了几天...

这按预期工作。当您从 Flux<T> 流式传输项目时,只要流中有项目,它就会提供。然后流将关闭。因此,在您的情况下,它从数据库中获取所有数据,将其流式传输给您,然后关闭。

如果您希望保持流打开,您需要继续发送数据。一种方法是使用 ServerSentsEvents 发送 :keep alive 消息(以逗号开头的消息)。您可以在 official mozilla documentation.

中阅读有关 ServerSentEventscolon operator 以及这些类型的消息的更多信息

当您能够实际保持流打开并希望发送数据时,您的服务将不知道新数据何时写入数据库。因此,要么轮询数据库,要么在写入内容时触发事件,以获取新写入的数据并将其放入流中。

如何将数据放置在连续的开放流中这个话题太大了,无法在这里解释。但我建议你阅读官方 reactor 文档中的以下部分:

Programmatically creating a sequence

Processors and Sinks