新实体的多个条件插入在 R2DBC 中产生重复输入错误
Multiple conditional inserts of a new entity gives duplicate entry error in R2DBC
让我们考虑一下这个函数
@Transactional
fun conditionalInsertEntity(dbEntity: DBEntity): Mono<DBEntity> {
return fetchObjectByPublicId(dbEntity.publicId)
.switchIfEmpty {
r2DatabaseClient.insert()
.into(DBEntity::class.java)
.using(Flux.just(dbEntity))
.fetch()
.one()
.map { it["entity_id"] as Long }
.flatMap { fetchObjectById(it) }
}
}
虽然 运行 以上功能具有以下驱动程序代码,但如果列表包含重复项,我会收到重复输入错误。理想情况下它不应该给出那个错误,因为上面的函数已经在处理重复插入的情况!!
val result = Flux.fromIterable(listOf(dbEntity1, dbEntity1, dbEntity2))
.flatMap { conditionalInsertEntity(it) }
.collectList()
.block()
意识到这是使用 flatMap 而不是 concatMap 的问题。
与 flatMap 不同,ConcatMap 按顺序从各个发布者收集结果。 (更多here)
因为我使用了 flatMap,多个发布者认为该实体在数据库中尚不可用
让我们考虑一下这个函数
@Transactional
fun conditionalInsertEntity(dbEntity: DBEntity): Mono<DBEntity> {
return fetchObjectByPublicId(dbEntity.publicId)
.switchIfEmpty {
r2DatabaseClient.insert()
.into(DBEntity::class.java)
.using(Flux.just(dbEntity))
.fetch()
.one()
.map { it["entity_id"] as Long }
.flatMap { fetchObjectById(it) }
}
}
虽然 运行 以上功能具有以下驱动程序代码,但如果列表包含重复项,我会收到重复输入错误。理想情况下它不应该给出那个错误,因为上面的函数已经在处理重复插入的情况!!
val result = Flux.fromIterable(listOf(dbEntity1, dbEntity1, dbEntity2))
.flatMap { conditionalInsertEntity(it) }
.collectList()
.block()
意识到这是使用 flatMap 而不是 concatMap 的问题。 与 flatMap 不同,ConcatMap 按顺序从各个发布者收集结果。 (更多here)
因为我使用了 flatMap,多个发布者认为该实体在数据库中尚不可用