filterWhen 与 flatMap 结合使用会在 Reactor 中抛出异常
filterWhen in combination with flatMap throws exception in Reactor
我想过滤 windows 所有值都满足特定条件的地方,然后打印出过滤后的通量:
Flux.just(4, 2, 6, 4, 5, 6, 7, 8, 9)
.window(3)
.filterWhen(window -> window.all(n -> n % 2 == 0))
.flatMap(window -> window)
.subscribe(System.out::println);
然而结果不是4, 2, 6
而是
java.lang.IllegalStateException: UnicastProcessor allows only a single Subscriber.
如果我尝试相同但没有 filterWhen
它会毫无例外地工作:
Flux.just(4, 2, 6, 4, 5, 6, 7, 8, 9)
.window(3)
.flatMap(window -> window)
.subscribe(System.out::println);
如果我在没有 flatMap
的情况下尝试,也不会抛出异常:
Flux.just(4, 2, 6, 4, 5, 6, 7, 8, 9)
.window(3)
.filterWhen(window -> window.all(n -> n % 2 == 0))
.subscribe(System.out::println);
然而,这两种方法显然都没有产生预期的结果。但是filterWhen
和flatMap
的组合好像有问题!
我的第一个例子有什么问题?另一个订阅者在哪里?
如何获取值?
这目前不可能,因为 window
只能订阅 ("consumed") 一次。 flatMap
和 filterWhen
都将订阅 window,因此你的错误。
您可以改为使用 buffer(3)
而不是 window(3)
,过滤缓冲区,然后从通过过滤器的缓冲区发出值:
Flux.just(4, 2, 6, 4, 5, 6, 7, 8, 9)
.buffer(3)
.filter(list -> list.stream().allMatch(n -> n % 2 == 0))
.flatMapIterable(Function.identity())
.subscribe(System.out::println);
我想过滤 windows 所有值都满足特定条件的地方,然后打印出过滤后的通量:
Flux.just(4, 2, 6, 4, 5, 6, 7, 8, 9)
.window(3)
.filterWhen(window -> window.all(n -> n % 2 == 0))
.flatMap(window -> window)
.subscribe(System.out::println);
然而结果不是4, 2, 6
而是
java.lang.IllegalStateException: UnicastProcessor allows only a single Subscriber.
如果我尝试相同但没有 filterWhen
它会毫无例外地工作:
Flux.just(4, 2, 6, 4, 5, 6, 7, 8, 9)
.window(3)
.flatMap(window -> window)
.subscribe(System.out::println);
如果我在没有 flatMap
的情况下尝试,也不会抛出异常:
Flux.just(4, 2, 6, 4, 5, 6, 7, 8, 9)
.window(3)
.filterWhen(window -> window.all(n -> n % 2 == 0))
.subscribe(System.out::println);
然而,这两种方法显然都没有产生预期的结果。但是filterWhen
和flatMap
的组合好像有问题!
我的第一个例子有什么问题?另一个订阅者在哪里?
如何获取值?
这目前不可能,因为 window
只能订阅 ("consumed") 一次。 flatMap
和 filterWhen
都将订阅 window,因此你的错误。
您可以改为使用 buffer(3)
而不是 window(3)
,过滤缓冲区,然后从通过过滤器的缓冲区发出值:
Flux.just(4, 2, 6, 4, 5, 6, 7, 8, 9)
.buffer(3)
.filter(list -> list.stream().allMatch(n -> n % 2 == 0))
.flatMapIterable(Function.identity())
.subscribe(System.out::println);