在 Spring 数据 MongoDB 上订阅两次 save() 导致双重插入
Subscribing twice on Spring Data MongoDB save() results in double insert
我们确实遇到了以下我们理解的行为,但是我们想知道它是否是预期的,以及是否有兴趣将其记录为某种陷阱。
我们正在试验 Spring Boot 2/Spring WebFlux 并设置一个小应用程序,基本上有这样的东西(全部缩短):
@PostMapping
public Mono<Todo> addTodos( @RequestBody Person person ) {
return personService.addPerson( person );
}
服务首先看起来像这样,因为我们想要将添加人员的事件也发布到消息队列中:
public class PersonService {
public Mono<Person> addPerson( Person person ) {
Mono<Person> addedPerson = personRepository.save( person );
addedPerson.subscribe( p -> rabbitTemplate.convertAndSend( "persons", p ) );
return addedPerson;
}
}
所以,这样做显然是错误的。 .subscribe()
触发流程,我们假设反应式 REST 控制器在序列化响应数据之前在后台执行相同的操作,从而产生第二个并行流程。最后,我们在数据库的 persons
集合中得到了两个重复的条目。
在这个冗长的介绍之后,最后的问题是:多个订阅者触发多个插入的这种预期行为(基本上,如果您订阅 n
次,您将获得 n
次插入)?
如果是,这可能是初学者需要强调的一个陷阱,特别是如果我们的理解是正确的,反应式 REST 控制器在幕后执行 .subscribe()
。
您自己得出了描述预期行为的结论。
反应式编程模型在很多方面都不同于命令式编程模型。
命令式编程结合了转换、映射、执行和其他方面。您可以通过创建 conditional/loop 流程、可以 return 值并将值传递给 API 调用的方法调用来表达这些。
响应式编程将正在发生的什么的声明与将要执行的如何分离。使用反应式基础设施的执行分为两部分:反应式序列组合和实际执行。在您的代码中,您只编写反应序列。执行发生在您的代码之外。
当您编写 Publisher
时,生成的 Publisher
包含执行时将发生的事情的声明。 Publisher
并不表示它是否会首先执行,也不表示最终会有多少订阅者订阅。
以上面的例子为例,Mono<Person> PersonRepository.save(…)
return是一个出版商:
- 将数据从
Person
映射到 Document
- 将
Document
保存到 MongoDB 和
- 一旦来自 MongoDB 的响应返回
,就会发出保存的 Person
这是使用特定存储库方法保存数据的方法。创建发布者不执行发布者,发布者不对执行次数有意见。多次调用 .subscribe()
多次执行发布者。
我认为 .subscribe()
不是陷阱。反应式编程模型方法让执行不再受阻。如果您调用 .subscribe()
或 .block()
,那么您应该有充分的理由这样做。每次在代码中看到 .subscribe()
或 .block()
时,您都应该格外注意这样做是否正确。您的执行环境负责订阅 Publisher
s.
几点观察:
RabbitTemplate
是阻塞 API。你不应该混合反应性和阻塞 APIs。如果您没有其他选择,则将阻塞调用卸载给工作人员。在包含阻塞工作的实际运算符之前使用 publishOn(…)
和 Scheduler
或将 ExecutorService
/CompletableFuture
与 flatMap(…)
. 一起使用
- 对
Mono
/Flux
的反应流组合使用 flatMap(…)
运算符。 flatMap(…)
运算符启动最终完成并继续流程的非阻塞子进程。
- 使用
doOnXXX(…)
运算符(doOnNext(…)
、doOnSuccess(…)
、...)在发布者发出特定信号时进行回调。这些hook方法可以方便的拦截元素非阻塞消费
参考文献:
我们确实遇到了以下我们理解的行为,但是我们想知道它是否是预期的,以及是否有兴趣将其记录为某种陷阱。
我们正在试验 Spring Boot 2/Spring WebFlux 并设置一个小应用程序,基本上有这样的东西(全部缩短):
@PostMapping
public Mono<Todo> addTodos( @RequestBody Person person ) {
return personService.addPerson( person );
}
服务首先看起来像这样,因为我们想要将添加人员的事件也发布到消息队列中:
public class PersonService {
public Mono<Person> addPerson( Person person ) {
Mono<Person> addedPerson = personRepository.save( person );
addedPerson.subscribe( p -> rabbitTemplate.convertAndSend( "persons", p ) );
return addedPerson;
}
}
所以,这样做显然是错误的。 .subscribe()
触发流程,我们假设反应式 REST 控制器在序列化响应数据之前在后台执行相同的操作,从而产生第二个并行流程。最后,我们在数据库的 persons
集合中得到了两个重复的条目。
在这个冗长的介绍之后,最后的问题是:多个订阅者触发多个插入的这种预期行为(基本上,如果您订阅 n
次,您将获得 n
次插入)?
如果是,这可能是初学者需要强调的一个陷阱,特别是如果我们的理解是正确的,反应式 REST 控制器在幕后执行 .subscribe()
。
您自己得出了描述预期行为的结论。
反应式编程模型在很多方面都不同于命令式编程模型。
命令式编程结合了转换、映射、执行和其他方面。您可以通过创建 conditional/loop 流程、可以 return 值并将值传递给 API 调用的方法调用来表达这些。
响应式编程将正在发生的什么的声明与将要执行的如何分离。使用反应式基础设施的执行分为两部分:反应式序列组合和实际执行。在您的代码中,您只编写反应序列。执行发生在您的代码之外。
当您编写 Publisher
时,生成的 Publisher
包含执行时将发生的事情的声明。 Publisher
并不表示它是否会首先执行,也不表示最终会有多少订阅者订阅。
以上面的例子为例,Mono<Person> PersonRepository.save(…)
return是一个出版商:
- 将数据从
Person
映射到Document
- 将
Document
保存到 MongoDB 和 - 一旦来自 MongoDB 的响应返回 ,就会发出保存的
Person
这是使用特定存储库方法保存数据的方法。创建发布者不执行发布者,发布者不对执行次数有意见。多次调用 .subscribe()
多次执行发布者。
我认为 .subscribe()
不是陷阱。反应式编程模型方法让执行不再受阻。如果您调用 .subscribe()
或 .block()
,那么您应该有充分的理由这样做。每次在代码中看到 .subscribe()
或 .block()
时,您都应该格外注意这样做是否正确。您的执行环境负责订阅 Publisher
s.
几点观察:
RabbitTemplate
是阻塞 API。你不应该混合反应性和阻塞 APIs。如果您没有其他选择,则将阻塞调用卸载给工作人员。在包含阻塞工作的实际运算符之前使用publishOn(…)
和Scheduler
或将ExecutorService
/CompletableFuture
与flatMap(…)
. 一起使用
- 对
Mono
/Flux
的反应流组合使用flatMap(…)
运算符。flatMap(…)
运算符启动最终完成并继续流程的非阻塞子进程。 - 使用
doOnXXX(…)
运算符(doOnNext(…)
、doOnSuccess(…)
、...)在发布者发出特定信号时进行回调。这些hook方法可以方便的拦截元素非阻塞消费
参考文献: