如何使用 reactor 和 R2dbc 压缩嵌套列表

How to zip nested lists with reactor and R2dbc

我在 postgres 数据库中有 3 个表,我正在使用 R2dbc 以关系方式查询和连接它们。

我有 3 个实体 类(可能不应该是数据 类,但不应该影响示例)

@Entity
@Table(name = "parent", schema = "public", catalog = "Test")
data class MyParentObject(
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    @Id
    @org.springframework.data.annotation.Id
    @Column(name = "id")
    var id: Int = 0,

    @Transient
    var childData: List<MyChildObject>? = null
)
@Entity
@Table(name = "child", schema = "public", catalog = "Test")
data class MyChildObject(
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    @Id
    @org.springframework.data.annotation.Id
    @Column(name = "id")
    var id: Int = 0,

    @Column(name = "parent_id")
    var parentId: Int? = null

    @Transient
    var grandchildData: List<MyGrandchildObject>? = null
)
@Entity
@Table(name = "grandchild", schema = "public", catalog = "Test")
data class MyGrandchildObject(
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    @Id
    @org.springframework.data.annotation.Id
    @Column(name = "id")
    var id: Int = 0
    
    @Column(name = "child_id")
    var childId: Int? = null
)

parent 是 one-to-many 到 child 即 grandchild 的 one-to-many。 parent_id 和 child_id 就像 fkeys。

我有一个 RestController,它可以 return 所有 Parent 数据通过这些方法填充所有 child 数据

fun viewAllParents(): Mono<MutableList<MyParentObject>> =
    parentRepository.findAll()
        .flatMap { Mono.just(it).addChildData(it.id) }
        .collectList()

fun Mono<MyParentObject>.addChildData(id: Int): Mono<MyParentObject> =
    this.zipWith(childRepository.getAllByParentIdEquals(id).collectList())
        .map {
            it.t1.childData = it.t2
            it.t1
        }

我还有另一个 RestController,它可以通过这些方法 return 所有 ChildData 和所有 Grandchild 数据(与上面大致相同)

fun viewAllChildren(): Mono<MutableList<MyChildObject>> =
    childRepository.findAll()
        .flatMap { Mono.just(it).addGrandchildData(it.id) }
        .collectList()

fun Mono<MyChildObject>.addGrandchildData(id: Int): Mono<MyChildObject> =
        this.zipWith(childOfChildRepository.getAllByChildIdEquals(id).collectList())
            .map {
                it.t1.childOfChildData = it.t2
                it.t1
            }

我不能做的,也是我的问题,我如何让 viewAllParents() 也填充 Grandchild 数据。我是否需要将 var grandchildData: List<MyGrandchildObject> 转换为 Flux 并使用来自 grandchildRepository 的新 flux 对其进行压缩?还是我看错了?

任何指点将不胜感激。

我真的很喜欢使用反应堆获取分层数据的挑战。 我不了解 Kotlin,但我尝试使用 java 重现该问题。我无法使用父级 -> 子级 -> 孙级层次结构创建 PostgreSQL table,但我尝试通过 webclient 模拟类似的东西(基本上逻辑将保持不变)。这是我的代码,这是我尝试做的并且能够得到你想要的结果:https://github.com/harryalto/reactive-springwebflux

解决方案的关键在于处理程序代码,我正在使用该代码基于子项列表构建子流并使用它来将所有内容联系在一起

public Flux<Parent> getFamiliesHierarchy() {

        return getAllParents()
            .flatMap(parent ->
                    getAllChildsList(parent.getId())
                            .flatMap(childList -> getChildsWithKids(childList))
                            .map(childList -> parent.toBuilder().children(childList).build()
                            )

            );
    }

完整代码如下

@Component
@Slf4j
public class FamilyHandler {

    @Autowired
    private WebClient webClient;

    public Flux<Parent> getAllParents() {
        return webClient
            .get()
            .uri("parents")
            .retrieve()
            .bodyToFlux(Parent.class);
    }

    public Mono<List<Child>> getAllChildsList(final Integer parentId) {
         ParameterizedTypeReference<List<Child>> childList = 
             new ParameterizedTypeReference<List<Child>>() {};
        return webClient
            .get()
            .uri("childs?parentId=" + parentId)
            .retrieve()
            .bodyToMono(childList);
    }

    public Flux<GrandChild> getGrandKids(final Integer childId) {
         return webClient
            .get()
            .uri("grandKids?childId=" + childId)
            .retrieve()
            .bodyToFlux(GrandChild.class);
    }

    public Mono<List<GrandChild>> getGrandKidsList(final Integer childId) {
         ParameterizedTypeReference<List<GrandChild>> grandKidsList = 
         new ParameterizedTypeReference<List<GrandChild>>() {};
         return webClient
            .get()
            .uri("grandKids?childId=" + childId)
            .retrieve()
            .bodyToMono(grandKidsList);
    }

    private Mono<List<Child>> getChildsWithKids(final List<Child> childList) {
        return Flux.fromIterable(childList).flatMap(child ->
                Mono.zip(Mono.just(child), getGrandKidsList(child.getId()))
                        .map(tuple2 ->        tuple2.getT1().toBuilder().grandChildren(tuple2.getT2()).build())
        ).collectList();
    }

    public Flux<Parent> getFamiliesHierarchy() {

        return getAllParents()
            .flatMap(parent ->
                    getAllChildsList(parent.getId())
                            .flatMap(childList -> getChildsWithKids(childList))
                            .map(childList -> parent.toBuilder().children(childList).build()
                            )

            );
    }

}`

我使用 json-server 模拟服务器

下面是我的db.json文件

  {
       "parents":[
      {
         "id": 1,
         "name" : "Parent1",
         "path":"1"
      },
      {
         "id": 2,
         "name" : "Parent2",
         "path":"2"
      }
     ],
     "childs":[
     {
         "id": 1,
         "parentId": 1,
         "name": "child1Parent1",
         "path":"1.1"
     },
     {
         "id":2,
         "parentId": 1,
         "projectName": "child2Parent1",
         "path":"1.2"

     },
     {
         "id":3,
         "parentId": 2,
         "projectName": "child1Parent2",
         "path":"2.1"

     },
     {
         "id":4,
         "parentId": 2,
         "projectName": "child2Parent2",
         "path":"2.2"

      }
   ],
   "grandKids":[
   {
     "id":1,
     "childId": 2,
     "projectName": "grandKid1child2Parent1",
     "path":"1.2.1"

   },
   {
     "id":3,
     "childId": 2,
     "projectName": "grandKid1child2Parent1",
     "path":"1.2.2"

  },
  {
     "id":2,
     "childId": 4,
     "projectName": "grandKid1child1Parent2",
     "path":"2.2.1"

  },
  {
     "id":4,
     "childId": 4,
     "projectName": "grandKid1child1Parent2",
     "path":"2.2.2"

  },
  {
     "id":5,
     "childId": 3,
     "projectName": "grandKid1child1Parent2",
     "path":"2.1.1"

  }
  
  ]
}

这是我的控制器代码

@RestController
@Slf4j
public class FamilyController {

    @Autowired
    private FamilyHandler familyHandler;

    @GetMapping(FAMILIES_ENDPOINT_V1)
    public Flux<Parent> viewAllParents() {
         return familyHandler.getFamiliesHierarchy();
    }

}

我们可以轻松地改造 r2DBC 存储库的代码。

-- 更新 -- 我能够创建示例数据并使用 R2DBC 创建等效数据 这是 link 到 gist