Flux.compose 和 Flux.transform 的区别?
Difference between Flux.compose and Flux.transform?
正在学习reactive streams,正在做Publishers(Flux),致力于Flux的改造。为此,我得到了 compose 和 transform 方法。
这是我的代码:
private static void composeStream() {
System.out.println("*********Calling composeStream************");
Function<Flux<String>, Flux<String>> alterMap = f -> {
return f.filter(color -> !color.equals("ram"))
.map(String::toUpperCase);
};
Flux<String> compose = Flux.fromIterable(Arrays.asList("ram", "sam", "kam", "dam"))
.doOnNext(System.out::println)
.compose(alterMap);
compose.subscribe(d -> System.out.println("Subscriber to Composed AlterMap :"+d));
System.out.println("-------------------------------------");
}
private static void transformStream() {
System.out.println("*********Calling transformStream************");
Function<Flux<String>, Flux<String>> alterMap = f -> f.filter(color -> !color.equals("ram"))
.map(String::toUpperCase);
Flux.fromIterable(Arrays.asList("ram", "sam", "kam", "dam"))
.doOnNext(System.out::println)
.transform(alterMap)
.subscribe(d -> System.out.println("Subscriber to Transformed AlterMap: "+d));
System.out.println("-------------------------------------");
}
这是输出,两种情况都相同:
*********Calling transformStream************
ram
sam
Subscriber to Transformed AlterMap: SAM
kam
Subscriber to Transformed AlterMap: KAM
dam
Subscriber to Transformed AlterMap: DAM
-------------------------------------
*********Calling composeStream************
ram
sam
Subscriber to Composed AlterMap :SAM
kam
Subscriber to Composed AlterMap :KAM
dam
Subscriber to Composed AlterMap :DAM
-------------------------------------
两者有什么区别?
请推荐
根据文档:
Transform this Flux
in order to generate a target Flux
. Unlike Flux#compose(Function)
, the provided function is executed as part of assembly.
这是什么意思?
如果我们像下面这样写一个小测试:
int[] counter = new int[1];
Function transformer = f -> {
counter[0]++;
return f;
}
Flux flux = flux Flux.just("")
.transform(transformer);
System.out.println(counter[0]);
flux.subscribe();
flux.subscribe();
flux.subscribe();
System.out.println(counter[0]);
在输出中我们将观察下一个结果:
1
1
也就是说在组装管道的过程中会执行一次transform函数,也就是说transform函数会被执行eagerly .
反过来,.compose
我们将获得相同代码的下一个行为
int[] counter = new int[1];
Function transformer = f -> {
counter[0]++;
return f;
}
Flux flux = flux Flux.just("")
.compose(transformer);
System.out.println(counter[0]);
flux.subscribe();
flux.subscribe();
flux.subscribe();
System.out.println(counter[0]);
并输出
0
3
这意味着对于每个订阅者转换函数将被单独执行,我们可以将这种执行视为lazy
正在学习reactive streams,正在做Publishers(Flux),致力于Flux的改造。为此,我得到了 compose 和 transform 方法。
这是我的代码:
private static void composeStream() {
System.out.println("*********Calling composeStream************");
Function<Flux<String>, Flux<String>> alterMap = f -> {
return f.filter(color -> !color.equals("ram"))
.map(String::toUpperCase);
};
Flux<String> compose = Flux.fromIterable(Arrays.asList("ram", "sam", "kam", "dam"))
.doOnNext(System.out::println)
.compose(alterMap);
compose.subscribe(d -> System.out.println("Subscriber to Composed AlterMap :"+d));
System.out.println("-------------------------------------");
}
private static void transformStream() {
System.out.println("*********Calling transformStream************");
Function<Flux<String>, Flux<String>> alterMap = f -> f.filter(color -> !color.equals("ram"))
.map(String::toUpperCase);
Flux.fromIterable(Arrays.asList("ram", "sam", "kam", "dam"))
.doOnNext(System.out::println)
.transform(alterMap)
.subscribe(d -> System.out.println("Subscriber to Transformed AlterMap: "+d));
System.out.println("-------------------------------------");
}
这是输出,两种情况都相同:
*********Calling transformStream************
ram
sam
Subscriber to Transformed AlterMap: SAM
kam
Subscriber to Transformed AlterMap: KAM
dam
Subscriber to Transformed AlterMap: DAM
-------------------------------------
*********Calling composeStream************
ram
sam
Subscriber to Composed AlterMap :SAM
kam
Subscriber to Composed AlterMap :KAM
dam
Subscriber to Composed AlterMap :DAM
-------------------------------------
两者有什么区别? 请推荐
根据文档:
Transform this
Flux
in order to generate a targetFlux
. UnlikeFlux#compose(Function)
, the provided function is executed as part of assembly.
这是什么意思?
如果我们像下面这样写一个小测试:
int[] counter = new int[1];
Function transformer = f -> {
counter[0]++;
return f;
}
Flux flux = flux Flux.just("")
.transform(transformer);
System.out.println(counter[0]);
flux.subscribe();
flux.subscribe();
flux.subscribe();
System.out.println(counter[0]);
在输出中我们将观察下一个结果:
1
1
也就是说在组装管道的过程中会执行一次transform函数,也就是说transform函数会被执行eagerly .
反过来,.compose
我们将获得相同代码的下一个行为
int[] counter = new int[1];
Function transformer = f -> {
counter[0]++;
return f;
}
Flux flux = flux Flux.just("")
.compose(transformer);
System.out.println(counter[0]);
flux.subscribe();
flux.subscribe();
flux.subscribe();
System.out.println(counter[0]);
并输出
0
3
这意味着对于每个订阅者转换函数将被单独执行,我们可以将这种执行视为lazy