新实体的多个条件插入在 R2DBC 中产生重复输入错误

Multiple conditional inserts of a new entity gives duplicate entry error in R2DBC

让我们考虑一下这个函数

@Transactional
fun conditionalInsertEntity(dbEntity: DBEntity): Mono<DBEntity> {
    return fetchObjectByPublicId(dbEntity.publicId)
        .switchIfEmpty {
            r2DatabaseClient.insert()
                .into(DBEntity::class.java)
                .using(Flux.just(dbEntity))
                .fetch()
                .one()
                .map { it["entity_id"] as Long }
                .flatMap { fetchObjectById(it) }
        }
}

虽然 运行 以上功能具有以下驱动程序代码,但如果列表包含重复项,我会收到重复输入错误。理想情况下它不应该给出那个错误,因为上面的函数已经在处理重复插入的情况!!

val result = Flux.fromIterable(listOf(dbEntity1, dbEntity1, dbEntity2))
    .flatMap { conditionalInsertEntity(it) }
    .collectList()
    .block()

意识到这是使用 flatMap 而不是 concatMap 的问题。 与 flatMap 不同,ConcatMap 按顺序从各个发布者收集结果。 (更多here

因为我使用了 flatMap,多个发布者认为该实体在数据库中尚不可用