无法压缩通量和 return 空值。我们如何丢弃传递到通量中的事件对?
Cannot zip a flux and return null values. How can we discard an event pair passed into a flux?
假设你有一组 Flux
,你想用双函数压缩在一起。
Flux<String> flux1 = Flux.just("A", "B", "C");
Flux<String> flux2 = Flux.just("D", "E", "F");
Flux.zip(flux1, flux2, this::zipString).subscribe(System.out::println);
This bifunction below will return null
if an object meets a certain constraint. So that we can possibly apply a filter after zipping the Flux
together.
public String zipString(String a, String b) {
if (a.equals("B"))
return null;
return a + b;
}
这个策略最终会抛出 NullPointerException
。
Exception in thread "main"
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.NullPointerException: The zipper returned a null value
Caused by: java.lang.NullPointerException: The zipper returned a null value
at java.util.Objects.requireNonNull(Objects.java:228)
at reactor.core.publisher.FluxZip$ZipCoordinator.drain(FluxZip.java:711)
at reactor.core.publisher.FluxZip$ZipInner.onSubscribe(FluxZip.java:861)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
编辑:附带说明,当您有一个包含 null 的 Flux
时,也会发生这种情况。
Flux<String> flux2 = Flux.just(null, "B", "C");
所以这让我问。为什么禁止 null
值进入 Flux?
缓解以下用例的一些潜在策略是什么:
flux2 contains a value that we should discard, therefore the set should be discarded.
反应规范不允许在流中使用 null
。选择一个像 "n/a" 这样的常量,稍后将其过滤掉。
https://github.com/reactive-streams/reactive-streams-jvm#2-subscriber-code
- Calling onSubscribe, onNext, onError or onComplete MUST return
normally except when any provided parameter is null in which case it
MUST throw a java.lang.NullPointerException to the caller
感觉应该在处理反应流一段时间后重新审视。有两种方法可以比在下游过滤默认值更优雅地处理这些情况。
元组(首选和最优)
Flux.zip(flux1,flux2)
.filter(t -> !t.getT1().equals("B") || !t.getT2().equals("B"))
.map(this::zipString)
.subscribe(System.out::println)
压缩策略返回可选
public Optional<String> zipString(String a, String b) {
if (a.equals("B"))
return Optional.empty();
return Optional.of(a + b);
}
Flux.zip(flux1, flux2, this::zipString)
.filter(Optional::isPresent)
.map(Optional::get)
.subscribe(System.out::println);
假设你有一组 Flux
,你想用双函数压缩在一起。
Flux<String> flux1 = Flux.just("A", "B", "C");
Flux<String> flux2 = Flux.just("D", "E", "F");
Flux.zip(flux1, flux2, this::zipString).subscribe(System.out::println);
This bifunction below will return
null
if an object meets a certain constraint. So that we can possibly apply a filter after zipping theFlux
together.
public String zipString(String a, String b) {
if (a.equals("B"))
return null;
return a + b;
}
这个策略最终会抛出 NullPointerException
。
Exception in thread "main"
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.NullPointerException: The zipper returned a null value
Caused by: java.lang.NullPointerException: The zipper returned a null value
at java.util.Objects.requireNonNull(Objects.java:228)
at reactor.core.publisher.FluxZip$ZipCoordinator.drain(FluxZip.java:711)
at reactor.core.publisher.FluxZip$ZipInner.onSubscribe(FluxZip.java:861)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
编辑:附带说明,当您有一个包含 null 的 Flux
时,也会发生这种情况。
Flux<String> flux2 = Flux.just(null, "B", "C");
所以这让我问。为什么禁止 null
值进入 Flux?
缓解以下用例的一些潜在策略是什么:
flux2 contains a value that we should discard, therefore the set should be discarded.
反应规范不允许在流中使用 null
。选择一个像 "n/a" 这样的常量,稍后将其过滤掉。
https://github.com/reactive-streams/reactive-streams-jvm#2-subscriber-code
- Calling onSubscribe, onNext, onError or onComplete MUST return normally except when any provided parameter is null in which case it MUST throw a java.lang.NullPointerException to the caller
感觉应该在处理反应流一段时间后重新审视。有两种方法可以比在下游过滤默认值更优雅地处理这些情况。
元组(首选和最优)
Flux.zip(flux1,flux2)
.filter(t -> !t.getT1().equals("B") || !t.getT2().equals("B"))
.map(this::zipString)
.subscribe(System.out::println)
压缩策略返回可选
public Optional<String> zipString(String a, String b) {
if (a.equals("B"))
return Optional.empty();
return Optional.of(a + b);
}
Flux.zip(flux1, flux2, this::zipString)
.filter(Optional::isPresent)
.map(Optional::get)
.subscribe(System.out::println);