如何处理嵌套的 Flux 和 Monos
How to handle Nested Flux and Monos
我的数据存储在 Cosmos 中,它是分层的,存储在不同的表中。
Hiearchy of Data
OnBoardingDefinition -> List<FeatureOrder>
FeatureStepMappingDefinition -> List<Steps>
OnBoardingStepDefinition -> Step details
当我调用 cosmos 时,我得到 Flux>、Flux> 和 Flux>。我需要构建一个完整的 OnBoarding 对象,其中包含 ID 的所有详细信息。
Optional<Flux<OnBoardingDefinition>> onBoardingDefinitionFlux = cosmosRepository.getCosmosDocuments(---);
Optional<Flux<FeatureStepMappingDefinition>> featureStepMappingDefinitionFlux = cosmosRepository.getCosmosDocuments(---);
Optional<Flux<OnBoardingStepDefinition>> onBoardingStepDefinitionFlux = cosmosRepository.getCosmosDocuments(----);
Flux<Flux<Mono<StepResponseDto>>> flux3 = onBoardingDefinitionFlux.get()
.map(onBoardingDefinition -> onBoardingDefinition.getFeatureOrder())
.flatMap(Flux::fromIterable)
.filter(featureOrder -> features.contains(featureOrder.getFeatureCode()) && Objects.nonNull(featureOrder.getRequired()) && featureOrder.getRequired())
.map(featureOrder ->
{
return getFromFeature(featureStepMappingDefinitionFlux, onBoardingStepDefinitionFlux, featureOrder);
}
);
private Flux<Mono<StepResponseDto>> getFromFeature(Optional<Flux<FeatureStepMappingDefinition>> featureStepMappingDefinitionFlux, Optional<Flux<OnBoardingStepDefinition>> onBoardingStepDefinitionFlux, FeatureOrder featureOrder) {
Flux<Mono<StepResponseDto>> flux1 = featureStepMappingDefinitionFlux.get()
.filter(featureStepMappingDefinition -> featureStepMappingDefinition.getFeatureCode().equalsIgnoreCase(featureOrder.getFeatureCode()))
.map(featureStepMappingDefinition -> featureStepMappingDefinition.getSteps())
.flatMap(Flux::fromIterable)
.map(step ->
{
return getStepResponseDtoMono(onBoardingStepDefinitionFlux, step);
}
);
return flux1;
}
private Mono<StepResponseDto> getStepResponseDtoMono(Optional<Flux<OnBoardingStepDefinition>> onBoardingStepDefinitionFlux, Step step) {
Mono<StepResponseDto> flux2 = onBoardingStepDefinitionFlux.get()
.filter(onBoardingStepDefinition -> Objects.nonNull(onBoardingStepDefinition.getActive()) && onBoardingStepDefinition.getActive() && onBoardingStepDefinition.getStepCode().equalsIgnoreCase(step.getStepCode()))
.map(onBoardingStepDef -> getStepResponseDto(onBoardingStepDef)).next();
return flux2;
}
我需要简化上述过程以避免嵌套的 Flux & Monos。我尝试了多种方法,但没有奏效。我已经创建了我期望使用 Flux 的示例代码和输出。我使用 Java 8 创建了我想要使用 Flux 实现的相同内容。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import reactor.core.publisher.Flux;
//Sample code
public class MRE {
public static class Step {
private String stepCode;
private Boolean active;
private String name;
public Step(String stepCode, Boolean active, String name) {
this.stepCode = stepCode;
this.active = active;
this.name = name;
}
public Boolean getActive() {
return active;
}
public void setActive(Boolean active) {
this.active = active;
}
public String getStepCode() {
return stepCode;
}
public void setStepCode(String stepCode) {
this.stepCode = stepCode;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Step{" +
"stepCode='" + stepCode + '\'' +
", active=" + active +
", name='" + name + '\'' +
'}';
}
}
public static class Steps {
private String stepCode;
public Steps(String stepCode) {
this.stepCode = stepCode;
}
public String getStepCode() {
return stepCode;
}
public void setStepCode(String stepCode) {
this.stepCode = stepCode;
}
@Override
public String toString() {
return "Steps{" +
"stepCode='" + stepCode + '\'' +
'}';
}
}
public static class FeatureStepMapping {
public FeatureStepMapping(String featureCode, List<Steps> steps) {
this.featureCode = featureCode;
this.steps = steps;
}
private String featureCode;
private List<Steps> steps = new ArrayList<>();
public String getFeatureCode() {
return featureCode;
}
public void setFeatureCode(String featureCode) {
this.featureCode = featureCode;
}
public List<Steps> getSteps() {
return steps;
}
public void setSteps(List<Steps> steps) {
this.steps = steps;
}
@Override
public String toString() {
return "FeatureStepMapping{" +
"featureCode='" + featureCode + '\'' +
", steps=" + steps +
'}';
}
}
public static class StepResponse {
private String stepCode;
private String name;
public StepResponse(String stepCode, String name) {
this.stepCode = stepCode;
this.name = name;
}
public String getStepCode() {
return stepCode;
}
public void setStepCode(String stepCode) {
this.stepCode = stepCode;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "StepResponse{" +
"stepCode='" + stepCode + '\'' +
", name='" + name + '\'' +
'}';
}
}
public static class OnBoarding {
private List<String> featureCodes = new ArrayList<>();
public OnBoarding(List<String> featureCodes) {
this.featureCodes = featureCodes;
}
public List<String> getFeatureCodes() {
return featureCodes;
}
public void setFeatureCodes(List<String> featureCodes) {
this.featureCodes = featureCodes;
}
@Override
public String toString() {
return "OnBoarding{" +
"featureCodes=" + featureCodes +
'}';
}
}
public static class OnBoardingResponse {
private String id;
private List<StepResponse> stepResponses = new ArrayList<>();
public OnBoardingResponse(String id, List<StepResponse> stepResponses) {
this.id = id;
this.stepResponses = stepResponses;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public List<StepResponse> getStepResponses() {
return stepResponses;
}
public void setStepResponses(List<StepResponse> stepResponses) {
this.stepResponses = stepResponses;
}
@Override
public String toString() {
return "OnBoardingResponse{" +
"id='" + id + '\'' +
", stepResponses=" + stepResponses +
'}';
}
}
public static void main(String[] args) {
Step step1 = new Step("S1", true, "Step1");
Step step2 = new Step("S2", true, "Step2");
Step step3 = new Step("S3", true, "Step3");
List<Step> stepList = Arrays.asList(step1, step2, step3);
Flux<List<Step>> stepFlux = Flux.just(stepList);
FeatureStepMapping featureStepMapping1 = new FeatureStepMapping("f1", new ArrayList<>());
featureStepMapping1.getSteps().add(new Steps("S1"));
featureStepMapping1.getSteps().add(new Steps("S2"));
FeatureStepMapping featureStepMapping2 = new FeatureStepMapping("f2", new ArrayList<>());
featureStepMapping1.getSteps().add(new Steps("S3"));
List<FeatureStepMapping> featureStepMappingList = Arrays.asList(featureStepMapping1, featureStepMapping2);
Flux<List<FeatureStepMapping>> stepFeature = Flux.just(featureStepMappingList);
List<OnBoarding> onBoardingList = Arrays.asList(new OnBoarding(Arrays.asList("f1", "f2")));
Flux<List<OnBoarding>> onBoardingFlux = Flux.just(onBoardingList);
// Get Mono<OnBoardingResponse> Don't change Flux<List<>> assume this is I get from database directly.
//With Plain Java8
List<StepResponse> stepResponses = onBoardingList.stream().flatMap(onBoarding -> onBoarding.getFeatureCodes().stream())
.map(feature ->
featureStepMappingList.stream()
.filter(featureStep -> featureStep.getFeatureCode().equalsIgnoreCase(feature))
.map(obj -> obj.getSteps()).flatMap(step -> step.stream()).collect(Collectors.toList())
).flatMap(obj -> obj.stream())
.map(step -> stepList.stream().filter(definedStep -> definedStep.getStepCode().equalsIgnoreCase(step.getStepCode())).collect(Collectors.toList()))
.flatMap(step -> step.stream()).map(step -> new StepResponse(step.getStepCode(), step.getName())).collect(Collectors.toList());
OnBoardingResponse onBoardingResponse = new OnBoardingResponse(UUID.randomUUID().toString(), stepResponses);
System.out.println(onBoardingResponse);
}
}
//Output
OnBoardingResponse{id='77b647c5-84ae-4fde-8039-ea5deed1a2bd', stepResponses=[StepResponse{stepCode='S1', name='Step1'}, StepResponse{stepCode='S2', name='Step2'}, StepResponse{stepCode='S3', name='Step3'}]}
以你为例,你可以做类似的事情来防止嵌套
Flux<Step> stepFlux = Flux.just(
new Step("S1", true, "Step1"),
new Step("S2", true, "Step2"),
new Step("S3", true, "Step3"));
FeatureStepMapping featureStepMapping1 = new FeatureStepMapping("f1", new ArrayList<>());
featureStepMapping1.getSteps().add(new Steps("S1"));
featureStepMapping1.getSteps().add(new Steps("S2"));
FeatureStepMapping featureStepMapping2 = new FeatureStepMapping("f2", new ArrayList<>());
featureStepMapping2.getSteps().add(new Steps("S3"));
Flux<FeatureStepMapping> stepFeature = Flux.just(featureStepMapping1,featureStepMapping2);
Flux<OnBoarding> onBoarding = Flux.just(new OnBoarding(Arrays.asList("f1", "f2")));
onBoarding
.map(OnBoarding::getFeatureCodes)
//Convert flux to be flux of the feature codes
.flatMap(Flux::fromIterable)
//Create tuple matching every feature code to every stepFeature
//Effectively the same as nested lists
.join(stepFeature,s -> Flux.never(),s -> Flux.never(),Tuples::of)
//filter out features that don't match
.filter(t -> t.getT2().getFeatureCode().equalsIgnoreCase(t.getT1()))
//Convert flux back to just stepFeatures
.map(Tuple2::getT2)
//Convert flux to list of steps
.map(FeatureStepMapping::getSteps)
//Convert to individual steps from list
.flatMap(Flux::fromIterable)
//Same as before but with stepFlux
.join(stepFlux,s -> Flux.never(),s -> Flux.never(),Tuples::of)
.filter(t -> t.getT2().getStepCode().equalsIgnoreCase(t.getT1().getStepCode()))
//Probably get the point by now
.map(Tuple2::getT2)
//Create step responses
.map(step -> new StepResponse(step.getStepCode(), step.getName()))
//Collect to Mono<List<StepResponse>>
.collectList()
//Create onboarding response
.map(stepList -> new OnBoardingResponse(UUID.randomUUID().toString(),stepList))
//Don't have to subscribe, return to whatever or do whatever with resultant Mono
//Just to see result as don't know your intentions
.subscribe(System.out::println);
此外,如果你真的得到 Flux<List<>>
,你可以使用
转换为普通的 Flux
fluxList.flatMap(Flux::fromIterable)
我的数据存储在 Cosmos 中,它是分层的,存储在不同的表中。
Hiearchy of Data
OnBoardingDefinition -> List<FeatureOrder>
FeatureStepMappingDefinition -> List<Steps>
OnBoardingStepDefinition -> Step details
当我调用 cosmos 时,我得到 Flux>、Flux> 和 Flux>。我需要构建一个完整的 OnBoarding 对象,其中包含 ID 的所有详细信息。
Optional<Flux<OnBoardingDefinition>> onBoardingDefinitionFlux = cosmosRepository.getCosmosDocuments(---);
Optional<Flux<FeatureStepMappingDefinition>> featureStepMappingDefinitionFlux = cosmosRepository.getCosmosDocuments(---);
Optional<Flux<OnBoardingStepDefinition>> onBoardingStepDefinitionFlux = cosmosRepository.getCosmosDocuments(----);
Flux<Flux<Mono<StepResponseDto>>> flux3 = onBoardingDefinitionFlux.get()
.map(onBoardingDefinition -> onBoardingDefinition.getFeatureOrder())
.flatMap(Flux::fromIterable)
.filter(featureOrder -> features.contains(featureOrder.getFeatureCode()) && Objects.nonNull(featureOrder.getRequired()) && featureOrder.getRequired())
.map(featureOrder ->
{
return getFromFeature(featureStepMappingDefinitionFlux, onBoardingStepDefinitionFlux, featureOrder);
}
);
private Flux<Mono<StepResponseDto>> getFromFeature(Optional<Flux<FeatureStepMappingDefinition>> featureStepMappingDefinitionFlux, Optional<Flux<OnBoardingStepDefinition>> onBoardingStepDefinitionFlux, FeatureOrder featureOrder) {
Flux<Mono<StepResponseDto>> flux1 = featureStepMappingDefinitionFlux.get()
.filter(featureStepMappingDefinition -> featureStepMappingDefinition.getFeatureCode().equalsIgnoreCase(featureOrder.getFeatureCode()))
.map(featureStepMappingDefinition -> featureStepMappingDefinition.getSteps())
.flatMap(Flux::fromIterable)
.map(step ->
{
return getStepResponseDtoMono(onBoardingStepDefinitionFlux, step);
}
);
return flux1;
}
private Mono<StepResponseDto> getStepResponseDtoMono(Optional<Flux<OnBoardingStepDefinition>> onBoardingStepDefinitionFlux, Step step) {
Mono<StepResponseDto> flux2 = onBoardingStepDefinitionFlux.get()
.filter(onBoardingStepDefinition -> Objects.nonNull(onBoardingStepDefinition.getActive()) && onBoardingStepDefinition.getActive() && onBoardingStepDefinition.getStepCode().equalsIgnoreCase(step.getStepCode()))
.map(onBoardingStepDef -> getStepResponseDto(onBoardingStepDef)).next();
return flux2;
}
我需要简化上述过程以避免嵌套的 Flux & Monos。我尝试了多种方法,但没有奏效。我已经创建了我期望使用 Flux 的示例代码和输出。我使用 Java 8 创建了我想要使用 Flux 实现的相同内容。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import reactor.core.publisher.Flux;
//Sample code
public class MRE {
public static class Step {
private String stepCode;
private Boolean active;
private String name;
public Step(String stepCode, Boolean active, String name) {
this.stepCode = stepCode;
this.active = active;
this.name = name;
}
public Boolean getActive() {
return active;
}
public void setActive(Boolean active) {
this.active = active;
}
public String getStepCode() {
return stepCode;
}
public void setStepCode(String stepCode) {
this.stepCode = stepCode;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Step{" +
"stepCode='" + stepCode + '\'' +
", active=" + active +
", name='" + name + '\'' +
'}';
}
}
public static class Steps {
private String stepCode;
public Steps(String stepCode) {
this.stepCode = stepCode;
}
public String getStepCode() {
return stepCode;
}
public void setStepCode(String stepCode) {
this.stepCode = stepCode;
}
@Override
public String toString() {
return "Steps{" +
"stepCode='" + stepCode + '\'' +
'}';
}
}
public static class FeatureStepMapping {
public FeatureStepMapping(String featureCode, List<Steps> steps) {
this.featureCode = featureCode;
this.steps = steps;
}
private String featureCode;
private List<Steps> steps = new ArrayList<>();
public String getFeatureCode() {
return featureCode;
}
public void setFeatureCode(String featureCode) {
this.featureCode = featureCode;
}
public List<Steps> getSteps() {
return steps;
}
public void setSteps(List<Steps> steps) {
this.steps = steps;
}
@Override
public String toString() {
return "FeatureStepMapping{" +
"featureCode='" + featureCode + '\'' +
", steps=" + steps +
'}';
}
}
public static class StepResponse {
private String stepCode;
private String name;
public StepResponse(String stepCode, String name) {
this.stepCode = stepCode;
this.name = name;
}
public String getStepCode() {
return stepCode;
}
public void setStepCode(String stepCode) {
this.stepCode = stepCode;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "StepResponse{" +
"stepCode='" + stepCode + '\'' +
", name='" + name + '\'' +
'}';
}
}
public static class OnBoarding {
private List<String> featureCodes = new ArrayList<>();
public OnBoarding(List<String> featureCodes) {
this.featureCodes = featureCodes;
}
public List<String> getFeatureCodes() {
return featureCodes;
}
public void setFeatureCodes(List<String> featureCodes) {
this.featureCodes = featureCodes;
}
@Override
public String toString() {
return "OnBoarding{" +
"featureCodes=" + featureCodes +
'}';
}
}
public static class OnBoardingResponse {
private String id;
private List<StepResponse> stepResponses = new ArrayList<>();
public OnBoardingResponse(String id, List<StepResponse> stepResponses) {
this.id = id;
this.stepResponses = stepResponses;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public List<StepResponse> getStepResponses() {
return stepResponses;
}
public void setStepResponses(List<StepResponse> stepResponses) {
this.stepResponses = stepResponses;
}
@Override
public String toString() {
return "OnBoardingResponse{" +
"id='" + id + '\'' +
", stepResponses=" + stepResponses +
'}';
}
}
public static void main(String[] args) {
Step step1 = new Step("S1", true, "Step1");
Step step2 = new Step("S2", true, "Step2");
Step step3 = new Step("S3", true, "Step3");
List<Step> stepList = Arrays.asList(step1, step2, step3);
Flux<List<Step>> stepFlux = Flux.just(stepList);
FeatureStepMapping featureStepMapping1 = new FeatureStepMapping("f1", new ArrayList<>());
featureStepMapping1.getSteps().add(new Steps("S1"));
featureStepMapping1.getSteps().add(new Steps("S2"));
FeatureStepMapping featureStepMapping2 = new FeatureStepMapping("f2", new ArrayList<>());
featureStepMapping1.getSteps().add(new Steps("S3"));
List<FeatureStepMapping> featureStepMappingList = Arrays.asList(featureStepMapping1, featureStepMapping2);
Flux<List<FeatureStepMapping>> stepFeature = Flux.just(featureStepMappingList);
List<OnBoarding> onBoardingList = Arrays.asList(new OnBoarding(Arrays.asList("f1", "f2")));
Flux<List<OnBoarding>> onBoardingFlux = Flux.just(onBoardingList);
// Get Mono<OnBoardingResponse> Don't change Flux<List<>> assume this is I get from database directly.
//With Plain Java8
List<StepResponse> stepResponses = onBoardingList.stream().flatMap(onBoarding -> onBoarding.getFeatureCodes().stream())
.map(feature ->
featureStepMappingList.stream()
.filter(featureStep -> featureStep.getFeatureCode().equalsIgnoreCase(feature))
.map(obj -> obj.getSteps()).flatMap(step -> step.stream()).collect(Collectors.toList())
).flatMap(obj -> obj.stream())
.map(step -> stepList.stream().filter(definedStep -> definedStep.getStepCode().equalsIgnoreCase(step.getStepCode())).collect(Collectors.toList()))
.flatMap(step -> step.stream()).map(step -> new StepResponse(step.getStepCode(), step.getName())).collect(Collectors.toList());
OnBoardingResponse onBoardingResponse = new OnBoardingResponse(UUID.randomUUID().toString(), stepResponses);
System.out.println(onBoardingResponse);
}
}
//Output
OnBoardingResponse{id='77b647c5-84ae-4fde-8039-ea5deed1a2bd', stepResponses=[StepResponse{stepCode='S1', name='Step1'}, StepResponse{stepCode='S2', name='Step2'}, StepResponse{stepCode='S3', name='Step3'}]}
以你为例,你可以做类似的事情来防止嵌套
Flux<Step> stepFlux = Flux.just(
new Step("S1", true, "Step1"),
new Step("S2", true, "Step2"),
new Step("S3", true, "Step3"));
FeatureStepMapping featureStepMapping1 = new FeatureStepMapping("f1", new ArrayList<>());
featureStepMapping1.getSteps().add(new Steps("S1"));
featureStepMapping1.getSteps().add(new Steps("S2"));
FeatureStepMapping featureStepMapping2 = new FeatureStepMapping("f2", new ArrayList<>());
featureStepMapping2.getSteps().add(new Steps("S3"));
Flux<FeatureStepMapping> stepFeature = Flux.just(featureStepMapping1,featureStepMapping2);
Flux<OnBoarding> onBoarding = Flux.just(new OnBoarding(Arrays.asList("f1", "f2")));
onBoarding
.map(OnBoarding::getFeatureCodes)
//Convert flux to be flux of the feature codes
.flatMap(Flux::fromIterable)
//Create tuple matching every feature code to every stepFeature
//Effectively the same as nested lists
.join(stepFeature,s -> Flux.never(),s -> Flux.never(),Tuples::of)
//filter out features that don't match
.filter(t -> t.getT2().getFeatureCode().equalsIgnoreCase(t.getT1()))
//Convert flux back to just stepFeatures
.map(Tuple2::getT2)
//Convert flux to list of steps
.map(FeatureStepMapping::getSteps)
//Convert to individual steps from list
.flatMap(Flux::fromIterable)
//Same as before but with stepFlux
.join(stepFlux,s -> Flux.never(),s -> Flux.never(),Tuples::of)
.filter(t -> t.getT2().getStepCode().equalsIgnoreCase(t.getT1().getStepCode()))
//Probably get the point by now
.map(Tuple2::getT2)
//Create step responses
.map(step -> new StepResponse(step.getStepCode(), step.getName()))
//Collect to Mono<List<StepResponse>>
.collectList()
//Create onboarding response
.map(stepList -> new OnBoardingResponse(UUID.randomUUID().toString(),stepList))
//Don't have to subscribe, return to whatever or do whatever with resultant Mono
//Just to see result as don't know your intentions
.subscribe(System.out::println);
此外,如果你真的得到 Flux<List<>>
,你可以使用
fluxList.flatMap(Flux::fromIterable)