在 Spring 数据 MongoDB 上订阅两次 save() 导致双重插入

Subscribing twice on Spring Data MongoDB save() results in double insert

我们确实遇到了以下我们理解的行为,但是我们想知道它是否是预期的,以及是否有兴趣将其记录为某种陷阱。

我们正在试验 Spring Boot 2/Spring WebFlux 并设置一个小应用程序,基本上有这样的东西(全部缩短):

@PostMapping
public Mono<Todo> addTodos( @RequestBody Person person ) {
    return personService.addPerson( person );
}

服务首先看起来像这样,因为我们想要将添加人员的事件也发布到消息队列中:

public class PersonService {
    public Mono<Person> addPerson( Person person ) {
        Mono<Person> addedPerson = personRepository.save( person );
        addedPerson.subscribe( p -> rabbitTemplate.convertAndSend( "persons", p ) );
        return addedPerson;
    }
}

所以,这样做显然是错误的。 .subscribe() 触发流程,我们假设反应式 REST 控制器在序列化响应数据之前在后台执行相同的操作,从而产生第二个并行流程。最后,我们在数据库的 persons 集合中得到了两个重复的条目。

在这个冗长的介绍之后,最后的问题是:多个订阅者触发多个插入的这种预期行为(基本上,如果您订阅 n 次,您将获得 n 次插入)?

如果是,这可能是初学者需要强调的一个陷阱,特别是如果我们的理解是正确的,反应式 REST 控制器在幕后执行 .subscribe()

您自己得出了描述预期行为的结论。

反应式编程模型在很多方面都不同于命令式编程模型。

命令式编程结合了转换、映射、执行和其他方面。您可以通过创建 conditional/loop 流程、可以 return 值并将值传递给 API 调用的方法调用来表达这些。

响应式编程将正在发生的什么的声明与将要执行的如何分离。使用反应式基础设施的执行分为两部分:反应式序列组合和实际执行。在您的代码中,您只编写反应序列。执行发生在您的代码之外。

当您编写 Publisher 时,生成的 Publisher 包含执行时将发生的事情的声明。 Publisher 并不表示它是否会首先执行,也不表示最终会有多少订阅者订阅。

以上面的例子为例,Mono<Person> PersonRepository.save(…) return是一个出版商:

  1. 将数据从 Person 映射到 Document
  2. Document 保存到 MongoDB 和
  3. 一旦来自 MongoDB 的响应返回
  4. ,就会发出保存的 Person

这是使用特定存储库方法保存数据的方法。创建发布者不执行发布者,发布者不对执行次数有意见。多次调用 .subscribe() 多次执行发布者。

我认为 .subscribe() 不是陷阱。反应式编程模型方法让执行不再受阻。如果您调用 .subscribe().block(),那么您应该有充分的理由这样做。每次在代码中看到 .subscribe().block() 时,您都应该格外注意这样做是否正确。您的执行环境负责订阅 Publishers.


几点观察:

  • RabbitTemplate 是阻塞 API。你不应该混合反应性和阻塞 APIs。如果您没有其他选择,则将阻塞调用卸载给工作人员。在包含阻塞工作的实际运算符之前使用 publishOn(…)Scheduler 或将 ExecutorService/CompletableFutureflatMap(…).
  • 一起使用
  • Mono/Flux 的反应流组合使用 flatMap(…) 运算符。 flatMap(…) 运算符启动最终完成并继续流程的非阻塞子进程。
  • 使用 doOnXXX(…) 运算符(doOnNext(…)doOnSuccess(…)、...)在发布者发出特定信号时进行回调。这些hook方法可以方便的拦截元素非阻塞消费

参考文献: