Spring Flux(或 Java Stream)合并为左连接
Spring Flux ( or Java Stream ) merged as a left join
如何加入这两个Flux
val a: Flux<String> = Flux.just("foo", "bar", "baz", "foobar")
val b: Flux<Pair<String, Int>> = Flux.just( Pair("foo", 5), Pair("baz", 5))
要像这样输出另一个 Flux ?
val c: Flux<Pair<String, Int>> = Flux.just(
Pair("foo", 5),
Pair("bar", 0),
Pair("baz", 5),
Pair("foobar",0)
)
简而言之,我需要 A 的所有元素,如果存在 B 的值,zero/null 否则,就像 sql 左连接
我正在查看 zip
、zipWith
、merge
,但我有点困惑。有什么提示吗?
谢谢
对 Kotlin 不是很熟悉,但由于您也要求过任一流解决方案,您可以得出与以下示例的相似之处:
List<String> list = List.of("foo", "bar", "baz", "foobar");
List<Pair<String, Integer>> pairs = List.of(Pair.of("foo", 5), Pair.of("baz", 5));
List<Pair<String, Integer>> result = list.stream()
.map(a ->
pairs.stream()
.filter(p -> p.getLeft().equals(a))
.findFirst()
.orElse(Pair.of(a, 0)))
.collect(Collectors.toList());
Flux.from(a)
.map(s -> {
return b.filter(strPair -> s.equals(strPair.fst)).switchIfEmpty(Flux.just(Pair.of(s,0)));
}).flatMap(Flux::next).log().subscribe();
输出:
10:26:00.358 [main] INFO reactor.Flux.FlatMap.3 - request(unbounded)
10:26:00.591 [main] INFO reactor.Flux.FlatMap.3 - onNext(Pair[foo,5])
10:26:00.593 [main] INFO reactor.Flux.FlatMap.3 - onNext(Pair[bar,0])
10:26:00.594 [main] INFO reactor.Flux.FlatMap.3 - onNext(Pair[baz,5])
10:26:00.594 [main] INFO reactor.Flux.FlatMap.3 - onNext(Pair[foobar,0])
10:26:00.594 [main] INFO reactor.Flux.FlatMap.3 - onComplete()
如何加入这两个Flux
val a: Flux<String> = Flux.just("foo", "bar", "baz", "foobar")
val b: Flux<Pair<String, Int>> = Flux.just( Pair("foo", 5), Pair("baz", 5))
要像这样输出另一个 Flux ?
val c: Flux<Pair<String, Int>> = Flux.just(
Pair("foo", 5),
Pair("bar", 0),
Pair("baz", 5),
Pair("foobar",0)
)
简而言之,我需要 A 的所有元素,如果存在 B 的值,zero/null 否则,就像 sql 左连接
我正在查看 zip
、zipWith
、merge
,但我有点困惑。有什么提示吗?
谢谢
对 Kotlin 不是很熟悉,但由于您也要求过任一流解决方案,您可以得出与以下示例的相似之处:
List<String> list = List.of("foo", "bar", "baz", "foobar");
List<Pair<String, Integer>> pairs = List.of(Pair.of("foo", 5), Pair.of("baz", 5));
List<Pair<String, Integer>> result = list.stream()
.map(a ->
pairs.stream()
.filter(p -> p.getLeft().equals(a))
.findFirst()
.orElse(Pair.of(a, 0)))
.collect(Collectors.toList());
Flux.from(a)
.map(s -> {
return b.filter(strPair -> s.equals(strPair.fst)).switchIfEmpty(Flux.just(Pair.of(s,0)));
}).flatMap(Flux::next).log().subscribe();
输出:
10:26:00.358 [main] INFO reactor.Flux.FlatMap.3 - request(unbounded)
10:26:00.591 [main] INFO reactor.Flux.FlatMap.3 - onNext(Pair[foo,5])
10:26:00.593 [main] INFO reactor.Flux.FlatMap.3 - onNext(Pair[bar,0])
10:26:00.594 [main] INFO reactor.Flux.FlatMap.3 - onNext(Pair[baz,5])
10:26:00.594 [main] INFO reactor.Flux.FlatMap.3 - onNext(Pair[foobar,0])
10:26:00.594 [main] INFO reactor.Flux.FlatMap.3 - onComplete()