为什么在 Spring 反应式 Mongo 中订阅有效而阻止无效?

Why does subscribe work and block doesn't in Spring reactive Mongo?

我通过选择 KotlinGradleM7Web-reactive.

创建了一个刚脱离 Spring Initializr 的项目

我做了一个小项目:

data class Person (val id: String)

@Component class PersonHandler(val template: ReactiveMongoTemplate) 
{
    init
    {
        println("Initializing")

        val jim: Mono<Person> =  template.save(Person("Jim"))
        val john: Mono<Person> = template.save(Person("John"))
        val jack: Mono<Person> = template.save(Person("Jack"))

        launch(jim)
        launch(john)
        launch(jack)

        println("Finished Initializing")
    }

    fun launch(mono: Mono<Person>)
    {
        mono.subscribe({println(it.id)}, {println("Error")}) // This works
        // mono.block()  This just hangs
    } 
}

我尝试将三个人保存到数据库中。 save 方法 returns 只是一个需要执行的 Mono。如果我尝试通过简单的订阅来执行它,一切都很好:

Initializing
Finished Initializing
2017-12-21 13:14:39.513  INFO 17278 --- [      Thread-13] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:3, serverValue:158}] to localhost:27017
2017-12-21 13:14:39.515  INFO 17278 --- [      Thread-12] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:4, serverValue:159}] to localhost:27017
2017-12-21 13:14:39.520  INFO 17278 --- [      Thread-14] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:5, serverValue:160}] to localhost:27017
Jim
Jack
John

但是,当我使用 block 而不是 subscribe 时,应用程序挂起:

Initializing
2017-12-21 13:16:47.200  INFO 17463 --- [      Thread-14] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:3, serverValue:163}] to localhost:27017

如果我手动查询数据库,我会看到 Jim 已被保存,但 Jack 和 John 没有。

这是一个错误,还是我做错了什么?我想在代码继续之前保证用户在数据库中,所以我真的很想使用 block.

我不确定它是否相关,但我收到编译器警告

Accessing nonfinal property template in constructor

有一个最小的工作示例。它包含两个分支。一种是该问题的解决方法。

https://github.com/martin-drozdik/spring-mongo-bug-example

我认为这可能是 Spring 框架错误/可用性问题。

首先,让我强调一下subscribeblock的区别:

  • subscribe 方法开始工作,returns 立即开始。因此,当应用程序的其他部分 运行.
  • 时,您无法保证操作已完成
  • block是一个阻塞操作:它触发操作并等待它完成。

对于初始化工作,组合操作和调用块一次可能是最好的选择:

val jim: Mono<Person> =  template.save(Person("Jim"))
val john: Mono<Person> = template.save(Person("John"))
val jack: Mono<Person> = template.save(Person("Jack"))
jim.then(john).then(jack).block();

如您所述,使用 block 会挂起应用程序。我怀疑这可能是一个 Spring 上下文初始化问题 - 如果我没记错的话,这个过程可能会在某些部分假设一个线程并使用反应管道在那里安排许多线程的工作。

您能否创建一个最小的示例应用程序(仅使用 Java/Spring Boot/Spring Data Reactive Mongo)并在 https://jira.spring.io 上报告?

我遇到过类似的情况,调用 "reactiveMongoTemplate.save(model).block()" 应用程序挂起。

问题是由我的 类 之一中的 @PostConstruct 引起的,该问题旨在在应用程序初始化后创建我的系统用户。我想不知何故它是在完全 Spring 上下文初始化之前调用的。

@Configuration
public class InitialDataPostLoader  {
    private Logger logger = LogManager.getLogger(this.getClass());


    @PostConstruct
    public void init() {
        logger.info(String.format(MSG_SERVICE_JOB, "System Metadata initialization"));
        createDefaultUsers();
    }

通过将 @PostConstruct 替换为 ContextRefreshEvent 侦听器,问题得到解决。

@Configuration 
public class InitialDataPostLoader implements
 ApplicationListener<ContextRefreshedEvent> {
     private Logger logger = LogManager.getLogger(this.getClass());

     @Override
     public void onApplicationEvent(ContextRefreshedEvent arg0) {

         logger.info(String.format(MSG_SERVICE_JOB, "System Metadata initialization"));
         createDefaultUsers();

     }