filterWhen 与 flatMap 结合使用会在 Reactor 中抛出异常

filterWhen in combination with flatMap throws exception in Reactor

我想过滤 windows 所有值都满足特定条件的地方,然后打印出过滤后的通量:

Flux.just(4, 2, 6, 4, 5, 6, 7, 8, 9)
        .window(3)
        .filterWhen(window -> window.all(n -> n % 2 == 0))
        .flatMap(window -> window)
        .subscribe(System.out::println);

然而结果不是4, 2, 6而是

java.lang.IllegalStateException: UnicastProcessor allows only a single Subscriber.

如果我尝试相同但没有 filterWhen 它会毫无例外地工作:

Flux.just(4, 2, 6, 4, 5, 6, 7, 8, 9)
        .window(3)
        .flatMap(window -> window)
        .subscribe(System.out::println);

如果我在没有 flatMap 的情况下尝试,也不会抛出异常:

Flux.just(4, 2, 6, 4, 5, 6, 7, 8, 9)
        .window(3)
        .filterWhen(window -> window.all(n -> n % 2 == 0))
        .subscribe(System.out::println);

然而,这两种方法显然都没有产生预期的结果。但是filterWhenflatMap的组合好像有问题!

我的第一个例子有什么问题?另一个订阅者在哪里?

如何获取值?

这目前不可能,因为 window 只能订阅 ("consumed") 一次。 flatMapfilterWhen 都将订阅 window,因此你的错误。

您可以改为使用 buffer(3) 而不是 window(3),过滤缓冲区,然后从通过过滤器的缓冲区发出值:

Flux.just(4, 2, 6, 4, 5, 6, 7, 8, 9)
    .buffer(3)
    .filter(list -> list.stream().allMatch(n -> n % 2 == 0))
    .flatMapIterable(Function.identity())
    .subscribe(System.out::println);