无法压缩通量和 return 空值。我们如何丢弃传递到通量中的事件对?

Cannot zip a flux and return null values. How can we discard an event pair passed into a flux?

假设你有一组 Flux,你想用双函数压缩在一起。

Flux<String> flux1 = Flux.just("A", "B", "C");
Flux<String> flux2 = Flux.just("D", "E", "F");
Flux.zip(flux1, flux2, this::zipString).subscribe(System.out::println);

This bifunction below will return null if an object meets a certain constraint. So that we can possibly apply a filter after zipping the Flux together.

public String zipString(String a, String b) {
    if (a.equals("B"))
        return null;
    return a + b;
}

这个策略最终会抛出 NullPointerException

Exception in thread "main" 
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.NullPointerException: The zipper returned a null value
Caused by: java.lang.NullPointerException: The zipper returned a null value
    at java.util.Objects.requireNonNull(Objects.java:228)
    at reactor.core.publisher.FluxZip$ZipCoordinator.drain(FluxZip.java:711)
    at reactor.core.publisher.FluxZip$ZipInner.onSubscribe(FluxZip.java:861)
    at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
    at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)

编辑:附带说明,当您有一个包含 null 的 Flux 时,也会发生这种情况。

Flux<String> flux2 = Flux.just(null, "B", "C");

所以这让我问。为什么禁止 null 值进入 Flux?

缓解以下用例的一些潜在策略是什么:

flux2 contains a value that we should discard, therefore the set should be discarded.

反应规范不允许在流中使用 null。选择一个像 "n/a" 这样的常量,稍后将其过滤掉。

https://github.com/reactive-streams/reactive-streams-jvm#2-subscriber-code

  1. Calling onSubscribe, onNext, onError or onComplete MUST return normally except when any provided parameter is null in which case it MUST throw a java.lang.NullPointerException to the caller

感觉应该在处理反应流一段时间后重新审视。有两种方法可以比在下游过滤默认值更优雅地处理这些情况。

元组(首选和最优)

Flux.zip(flux1,flux2)
     .filter(t -> !t.getT1().equals("B") || !t.getT2().equals("B"))
     .map(this::zipString)
     .subscribe(System.out::println)

压缩策略返回可选

public Optional<String> zipString(String a, String b) {
    if (a.equals("B"))
        return Optional.empty();
    return Optional.of(a + b);
}

Flux.zip(flux1, flux2, this::zipString)
    .filter(Optional::isPresent)
    .map(Optional::get)
    .subscribe(System.out::println);