Spring 带有 ReactiveMongoRepository 的 WebFlux:未通过流获取数据库更新
Spring WebFlux with ReactiveMongoRepository: not getting database updates through the stream
我面临的问题:
无论我尝试使用 Spring Reactive (WebFlux) REST API 的各种教程,我都无法让它工作。当我最初调用我的端点时,我能够从 MongoDB 集合中获取结果。但是,每当我更新文档条目或添加新文档时,它都不会通过文本事件流进行更新。每次我都必须再次调用端点以获得新结果。
设置:
目前我有以下设置:
- Spring云网关(后面我运行各种)
- Spring 包含 RestControllers 的启动服务(称之为主要服务)
我正在使用 Spring Webflux、Spring 云网关和 Spring ReactiveMongoRepository。
Spring 引导主服务的依赖项包括 pom.xml:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-oauth2-jose</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
ReactiveMongoRepository 的代码:
@Repository
public interface TestRepository extends ReactiveMongoRepository<TestIntegration, String> {
@Query(("{'userId': ?0}"))
Flux<TestIntegration> findbyUserId(String userId);
}
其余Controller的代码:
@GetMapping(value = "main/integrations", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<TestIntegration> retrieveIntegrations(ServerWebExchange exchange) {
return testRepository.findAll();
}
根据我读过的所有 tutorial/guideline,这应该是按照这种方法工作的。
有没有人也经历过这种情况或者可以帮助解决这个问题?目前坚持了几天...
这按预期工作。当您从 Flux<T>
流式传输项目时,只要流中有项目,它就会提供。然后流将关闭。因此,在您的情况下,它从数据库中获取所有数据,将其流式传输给您,然后关闭。
如果您希望保持流打开,您需要继续发送数据。一种方法是使用 ServerSentsEvents
发送 :keep alive
消息(以逗号开头的消息)。您可以在 official mozilla documentation.
中阅读有关 ServerSentEvents
和 colon operator
以及这些类型的消息的更多信息
当您能够实际保持流打开并希望发送数据时,您的服务将不知道新数据何时写入数据库。因此,要么轮询数据库,要么在写入内容时触发事件,以获取新写入的数据并将其放入流中。
如何将数据放置在连续的开放流中这个话题太大了,无法在这里解释。但我建议你阅读官方 reactor 文档中的以下部分:
我面临的问题:
无论我尝试使用 Spring Reactive (WebFlux) REST API 的各种教程,我都无法让它工作。当我最初调用我的端点时,我能够从 MongoDB 集合中获取结果。但是,每当我更新文档条目或添加新文档时,它都不会通过文本事件流进行更新。每次我都必须再次调用端点以获得新结果。
设置:
目前我有以下设置:
- Spring云网关(后面我运行各种)
- Spring 包含 RestControllers 的启动服务(称之为主要服务)
我正在使用 Spring Webflux、Spring 云网关和 Spring ReactiveMongoRepository。
Spring 引导主服务的依赖项包括 pom.xml:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-oauth2-jose</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
ReactiveMongoRepository 的代码:
@Repository
public interface TestRepository extends ReactiveMongoRepository<TestIntegration, String> {
@Query(("{'userId': ?0}"))
Flux<TestIntegration> findbyUserId(String userId);
}
其余Controller的代码:
@GetMapping(value = "main/integrations", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<TestIntegration> retrieveIntegrations(ServerWebExchange exchange) {
return testRepository.findAll();
}
根据我读过的所有 tutorial/guideline,这应该是按照这种方法工作的。 有没有人也经历过这种情况或者可以帮助解决这个问题?目前坚持了几天...
这按预期工作。当您从 Flux<T>
流式传输项目时,只要流中有项目,它就会提供。然后流将关闭。因此,在您的情况下,它从数据库中获取所有数据,将其流式传输给您,然后关闭。
如果您希望保持流打开,您需要继续发送数据。一种方法是使用 ServerSentsEvents
发送 :keep alive
消息(以逗号开头的消息)。您可以在 official mozilla documentation.
ServerSentEvents
和 colon operator
以及这些类型的消息的更多信息
当您能够实际保持流打开并希望发送数据时,您的服务将不知道新数据何时写入数据库。因此,要么轮询数据库,要么在写入内容时触发事件,以获取新写入的数据并将其放入流中。
如何将数据放置在连续的开放流中这个话题太大了,无法在这里解释。但我建议你阅读官方 reactor 文档中的以下部分: