将通量的通量映射到具有列表列表列表的对象

map Flux of Fluxes of Fluxes to an object with List of Lists of Lists

我正在抓取 API 并构建以下数据结构:

public class Organization {
    String id;
    String name;
    Flux<Space> spaces;
}

public class Space {
    String id;
    String name;
    Flux<Application> applications;
}

public class Application {
    String id;
    String name;
    Flux<Process> processes;
}

public class Process {
    String processId;
    Integer instances;
}

现在,我想保留 class Organization 的对象。 为此,我为所有前缀为 class 的 classes 创建了像 PersistedOrganizationList<Space> spaces 而不是 Fluxes,现在必须映射我的对象在以下代码段中接收:

cfOperations.organizations().list()
                .parallel(2)
                .runOn(Schedulers.parallel())
                .map(this::processOrganization)
                .subscribe(this::persistOrganization);

我已经订阅了最初的 Flux,现在可以访问 Organization 的对象。

不幸的是,我迷路了,看不出如何将那些嵌套的 Fluxes 解包到列表中。我已经尝试使用 Mono<List<Space>>,但由于我的知识有限,这似乎也没有解决问题。

我试过 flatMap() 但这最后给了我 Processes 的单个对象,看起来我失去了中间结果,比如 SpaceApplication

感谢您提供的任何启发。对我而言,重要的是在末尾保留一个根对象 (Organization),而不仅仅是少数叶子 (Process),这将直接与 .subscribe() 一起用于每个后续Flux.

你会发现这里的困难是没有办法从 Flux 创建 List 而不阻塞沿线的某处,这在某种程度上破坏了使用反应流的意义。您可能会更好地调整持久性 API,以便它可以直接处理 Flux 对象,而不仅仅是标准的 Java 集合。

也就是说,如果您仍然真的想这样做,我建议您在 [=30= 的 PersistedX 集中创建新的构造函数],所以它们看起来像下面这样:

class PersistedOrganization {

    String id;
    String name;
    List<PersistedSpace> spaces;

    public PersistedOrganization(Organization org) {
        this.id = org.id;
        this.name = org.name;
        this.spaces = org.spaces.collectList().block().stream().map(PersistedSpace::new);
    }
}

(其他 类 依此类推。)

完成后,坚持整个链条归结为:

List<PersistedOrganization> pl = orgList.stream().map(PersistedOrganization::new).collect(Collectors.toList());

...但是和以前一样,了解这将在每一步都阻塞,因此远非理想。

这是我没有阻塞的情况;我将层次结构缩短了 1,以使其更具可读性,但扩展它是微不足道的。基本上它只是每个级别重复的 flatMap - collectList - map 序列。

cfOperations.organizations().list()
        .flatMap(organization -> organization.spaces
                .flatMap(space -> space.applications
                        .map(application -> new PersistedApplication(application.id, application.name))
                        .collectList()
                        .map(persistedApplications -> new PersistedSpace(space.id, space.name, persistedApplications))
                )
                .collectList()
                .map(persistedSpaces -> new PersistedOrganization(organization.id, organization.name, persistedSpaces))
        )
        .subscribe(this::persistOrganization);

无论如何,我不认为使用 Fluxes 作为属性创建 POJO 是个好主意。例如,在订阅 cold Flux 之后,它的所有元素对于其他订阅者都是 'lost' - 这不是我期望 POJO 工作的方式;)太多的不确定性。