如何展开多个 Flux 并保留原件?

How to unroll multiple Flux and keep the originals?

我想生成两种对象类型的交错流。

我有两个辅助方法

总体结果应该是 Flux<JsonNode> 以适应两种对象类型的混合。

所以,结果会是这样的

[ {'name':'Aaa-X'}, 
    {'name':'Bbb-x1'},
    {'name':'Bbb-x2'},
  {'name':'Aaa-Y'},
    {'name':'Bbb-y1'},
    {'name':'Bbb-y2'},
    {'name':'Bbb-y3'}
]

作为粗略的草图,我尝试了这个:

final ObjectMapper om = new ObjectMapper();

public Flux<JsonNode> create() {
  return Flux.range(0, 2)       // create 2
    .map( idx -> genAaa() )     // bare Aaa's
    .flatMap( a -> genBbbs(a) ) // bare Aaa to Flux<Bbb> ???
    .map( om::valueToTree );    // anything to JsonNode
}

但是我这里有几个大问题:

因为我转换了 Aaa 对象(并因此消耗了它们)它们不再出现在结果中。我不知道如何在这种情况下“使用”和保留它们。

我在想是否可以将“进行中的通量”作为参数传递给生成函数,这样它们每个都在创建时添加 JsonNodes,但这感觉不对(完全不是异步的)无论如何我现在都不会。我想 Fluxes 中有一个概念我仍然没有理解。

您可以在传递给 flatMap 的函数中将 Flux#concatgenBbbs 方法一起使用:

private static Flux<JsonNode> combine() {
    ObjectMapper objectMapper = new ObjectMapper();
    return Flux.concat(getAaa("a0"), getAaa("a1"))// Flux<Aaa>
            .flatMap(aaa -> Flux.concat(Mono.just(aaa), getBbbs(aaa))) // Flux<Object>
            .map(objectMapper::valueToTree); // Flux<JsonNode>
}

concat 方法只是连接两个来源:

  1. 使用Mono.just
  2. 人工创建
  3. 来自 getBbbs(aaa) 调用的 Flux<B>

示例输出:

{"name":"a0"}
{"name":"B1-a0"}
{"name":"B2-a0"}
{"name":"a1"}
{"name":"B1-a1"}
{"name":"B2-a1"}

完整列表:

public class Main {
    @AllArgsConstructor
    @Data
    private static class Aaa {
        private String name;
    }

    @AllArgsConstructor
    @Data
    private static class Bbb {
        private String name;
    }

    private static Mono<Aaa> getAaa(String name) {
        return Mono.just(new Aaa(name));
    }

    private static Flux<Bbb> getBbbs(Aaa aaa) {
        return Flux.just(new Bbb("B1-" + aaa.getName()), new Bbb("B2-" + aaa.getName()));
    }


    public static void main(String[] args) {
        combine().subscribe(System.out::println);
    }

    private static Flux<JsonNode> combine() {
        ObjectMapper objectMapper = new ObjectMapper();
        return Flux.concat(getAaa("a0"), getAaa("a1"))// Flux<Aaa>
                .flatMap(aaa -> Flux.concat(Mono.just(aaa), getBbbs(aaa))) // Flux<Object>
                .map(objectMapper::valueToTree); // Flux<JsonNode>
    }
}