Flux.compose 和 Flux.transform 的区别?

Difference between Flux.compose and Flux.transform?

正在学习reactive streams,正在做Publishers(Flux),致力于Flux的改造。为此,我得到了 compose 和 transform 方法。

这是我的代码:

private static void composeStream() {
    System.out.println("*********Calling composeStream************");
    Function<Flux<String>, Flux<String>> alterMap = f -> {
                                                                return f.filter(color -> !color.equals("ram"))
                                                                        .map(String::toUpperCase);
                                                            };

    Flux<String> compose = Flux.fromIterable(Arrays.asList("ram", "sam", "kam", "dam"))
                                    .doOnNext(System.out::println)
                                    .compose(alterMap);

    compose.subscribe(d -> System.out.println("Subscriber to Composed AlterMap :"+d));
    System.out.println("-------------------------------------");

}

private static void transformStream() {
    System.out.println("*********Calling transformStream************");
    Function<Flux<String>, Flux<String>> alterMap = f -> f.filter(color -> !color.equals("ram"))
                                                            .map(String::toUpperCase);

    Flux.fromIterable(Arrays.asList("ram", "sam", "kam", "dam"))
            .doOnNext(System.out::println)
            .transform(alterMap)
            .subscribe(d -> System.out.println("Subscriber to Transformed AlterMap: "+d));
    System.out.println("-------------------------------------");
}

这是输出,两种情况都相同:

*********Calling transformStream************
ram
sam
Subscriber to Transformed AlterMap: SAM
kam
Subscriber to Transformed AlterMap: KAM
dam
Subscriber to Transformed AlterMap: DAM
-------------------------------------
*********Calling composeStream************
ram
sam
Subscriber to Composed AlterMap :SAM
kam
Subscriber to Composed AlterMap :KAM
dam
Subscriber to Composed AlterMap :DAM
-------------------------------------

两者有什么区别? 请推荐

根据文档:

Transform this Flux in order to generate a target Flux. Unlike Flux#compose(Function), the provided function is executed as part of assembly.

这是什么意思?

如果我们像下面这样写一个小测试:

 int[] counter = new int[1];
 Function transformer  = f -> {
     counter[0]++;

     return f;
 }

 Flux flux = flux Flux.just("")
     .transform(transformer);

 System.out.println(counter[0]);

 flux.subscribe();
 flux.subscribe();
 flux.subscribe();
 System.out.println(counter[0]);

在输出中我们将观察下一个结果:

 1
 1

也就是说在组装管道的过程中会执行一次transform函数,也就是说transform函数会被执行eagerly .

反过来,.compose 我们将获得相同代码的下一个行为

 int[] counter = new int[1];
 Function transformer  = f -> {
     counter[0]++;

     return f;
 }

 Flux flux = flux Flux.just("")
     .compose(transformer);

 System.out.println(counter[0]);

 flux.subscribe();
 flux.subscribe();
 flux.subscribe();
 System.out.println(counter[0]);

并输出

 0
 3

这意味着对于每个订阅者转换函数将被单独执行,我们可以将这种执行视为lazy