链接多个发布者转换响应

Chaining multiple publishers transforming the responses

我想在每次通量事件后链接一个单声道。 mono 发布者将需要来自 flux 发布的每个事件的信息。响应应该是具有通量事件和单声道响应数据的通量。

经过挖掘,我最终得到了一个 flatMap 中的地图。代码如下所示:

override fun searchPets(petSearch: PetSearch): Flux<Pet> {
    return petRepository
        .searchPets(petSearch) // returns Flux<pet>
        .flatMap { pet -> 
            petService
            .getCollarForMyPet() // returns Mono<collar>
            .map { collar -> PetConverter.addCollarToPet(pet, collar) } //returns pet (now with with collar)
        }
}

我主要担心的是:

这种方法非常好。

Reactive Streams 规范要求 onNext 事件不重叠,因此竞争条件不会出现问题。

flatMap 虽然引入了并发性,因此对 PetService 的多次调用将并行 运行。这应该不是问题,除非 searchPets 发出两次 Pet 的实例。

不是因为这种并发性,flatMap 可以在这种情况下对宠物重新排序。想象一下搜索 returns petA 然后 petB,但是 petService 调用 petA 需要更长的时间。在 flatMap 的输出中,petB 将首先被发射(带套环),然后是 petA.