将通量的通量映射到具有列表列表列表的对象
map Flux of Fluxes of Fluxes to an object with List of Lists of Lists
我正在抓取 API 并构建以下数据结构:
public class Organization {
String id;
String name;
Flux<Space> spaces;
}
public class Space {
String id;
String name;
Flux<Application> applications;
}
public class Application {
String id;
String name;
Flux<Process> processes;
}
public class Process {
String processId;
Integer instances;
}
现在,我想保留 class Organization
的对象。
为此,我为所有前缀为 class 的 classes 创建了像 PersistedOrganization
和 List<Space> spaces
而不是 Flux
es,现在必须映射我的对象在以下代码段中接收:
cfOperations.organizations().list()
.parallel(2)
.runOn(Schedulers.parallel())
.map(this::processOrganization)
.subscribe(this::persistOrganization);
我已经订阅了最初的 Flux
,现在可以访问 Organization
的对象。
不幸的是,我迷路了,看不出如何将那些嵌套的 Fluxes 解包到列表中。我已经尝试使用 Mono<List<Space>>
,但由于我的知识有限,这似乎也没有解决问题。
我试过 flatMap()
但这最后给了我 Process
es 的单个对象,看起来我失去了中间结果,比如 Space
和 Application
感谢您提供的任何启发。对我而言,重要的是在末尾保留一个根对象 (Organization
),而不仅仅是少数叶子 (Process
),这将直接与 .subscribe()
一起用于每个后续Flux
.
你会发现这里的困难是没有办法从 Flux
创建 List
而不阻塞沿线的某处,这在某种程度上破坏了使用反应流的意义。您可能会更好地调整持久性 API,以便它可以直接处理 Flux
对象,而不仅仅是标准的 Java 集合。
也就是说,如果您仍然真的想这样做,我建议您在 [=30= 的 PersistedX
集中创建新的构造函数],所以它们看起来像下面这样:
class PersistedOrganization {
String id;
String name;
List<PersistedSpace> spaces;
public PersistedOrganization(Organization org) {
this.id = org.id;
this.name = org.name;
this.spaces = org.spaces.collectList().block().stream().map(PersistedSpace::new);
}
}
(其他 类 依此类推。)
完成后,坚持整个链条归结为:
List<PersistedOrganization> pl = orgList.stream().map(PersistedOrganization::new).collect(Collectors.toList());
...但是和以前一样,了解这将在每一步都阻塞,因此远非理想。
这是我没有阻塞的情况;我将层次结构缩短了 1,以使其更具可读性,但扩展它是微不足道的。基本上它只是每个级别重复的 flatMap
- collectList
- map
序列。
cfOperations.organizations().list()
.flatMap(organization -> organization.spaces
.flatMap(space -> space.applications
.map(application -> new PersistedApplication(application.id, application.name))
.collectList()
.map(persistedApplications -> new PersistedSpace(space.id, space.name, persistedApplications))
)
.collectList()
.map(persistedSpaces -> new PersistedOrganization(organization.id, organization.name, persistedSpaces))
)
.subscribe(this::persistOrganization);
无论如何,我不认为使用 Fluxes 作为属性创建 POJO 是个好主意。例如,在订阅 cold Flux 之后,它的所有元素对于其他订阅者都是 'lost' - 这不是我期望 POJO 工作的方式;)太多的不确定性。
我正在抓取 API 并构建以下数据结构:
public class Organization {
String id;
String name;
Flux<Space> spaces;
}
public class Space {
String id;
String name;
Flux<Application> applications;
}
public class Application {
String id;
String name;
Flux<Process> processes;
}
public class Process {
String processId;
Integer instances;
}
现在,我想保留 class Organization
的对象。
为此,我为所有前缀为 class 的 classes 创建了像 PersistedOrganization
和 List<Space> spaces
而不是 Flux
es,现在必须映射我的对象在以下代码段中接收:
cfOperations.organizations().list()
.parallel(2)
.runOn(Schedulers.parallel())
.map(this::processOrganization)
.subscribe(this::persistOrganization);
我已经订阅了最初的 Flux
,现在可以访问 Organization
的对象。
不幸的是,我迷路了,看不出如何将那些嵌套的 Fluxes 解包到列表中。我已经尝试使用 Mono<List<Space>>
,但由于我的知识有限,这似乎也没有解决问题。
我试过 flatMap()
但这最后给了我 Process
es 的单个对象,看起来我失去了中间结果,比如 Space
和 Application
感谢您提供的任何启发。对我而言,重要的是在末尾保留一个根对象 (Organization
),而不仅仅是少数叶子 (Process
),这将直接与 .subscribe()
一起用于每个后续Flux
.
你会发现这里的困难是没有办法从 Flux
创建 List
而不阻塞沿线的某处,这在某种程度上破坏了使用反应流的意义。您可能会更好地调整持久性 API,以便它可以直接处理 Flux
对象,而不仅仅是标准的 Java 集合。
也就是说,如果您仍然真的想这样做,我建议您在 [=30= 的 PersistedX
集中创建新的构造函数],所以它们看起来像下面这样:
class PersistedOrganization {
String id;
String name;
List<PersistedSpace> spaces;
public PersistedOrganization(Organization org) {
this.id = org.id;
this.name = org.name;
this.spaces = org.spaces.collectList().block().stream().map(PersistedSpace::new);
}
}
(其他 类 依此类推。)
完成后,坚持整个链条归结为:
List<PersistedOrganization> pl = orgList.stream().map(PersistedOrganization::new).collect(Collectors.toList());
...但是和以前一样,了解这将在每一步都阻塞,因此远非理想。
这是我没有阻塞的情况;我将层次结构缩短了 1,以使其更具可读性,但扩展它是微不足道的。基本上它只是每个级别重复的 flatMap
- collectList
- map
序列。
cfOperations.organizations().list()
.flatMap(organization -> organization.spaces
.flatMap(space -> space.applications
.map(application -> new PersistedApplication(application.id, application.name))
.collectList()
.map(persistedApplications -> new PersistedSpace(space.id, space.name, persistedApplications))
)
.collectList()
.map(persistedSpaces -> new PersistedOrganization(organization.id, organization.name, persistedSpaces))
)
.subscribe(this::persistOrganization);
无论如何,我不认为使用 Fluxes 作为属性创建 POJO 是个好主意。例如,在订阅 cold Flux 之后,它的所有元素对于其他订阅者都是 'lost' - 这不是我期望 POJO 工作的方式;)太多的不确定性。