好的 implementation/support java.util.concurrent.Flow.Processor<T,R>

Good implementation/support for java.util.concurrent.Flow.Processor<T,R>

最近,我发现 projectreactor.io 对 Publisher 有很好的支持:

Flux.create(fluxSink -> {
           for (int i = 0; i < 10; i++)
            fluxSink.next(i);
           fluxSink.complete();
        })
                .map(...)
                .subscribe(...);

Proccessor有什么好的支持吗? 我的意思是类似或类似的东西:

XXX process = new XXX((inputValue, output) -> {
    if(inputValue == 0)
       output.error();
    else
       output.next(inputValue);
});

publisher.subscribe(process);  
process.subscribe(...);

如果没有,我该如何实现我自己的或者为什么我不能这样做?

更新 1:

经过讨论(见评论)看来在我的用例中我需要使用 flatMap(见答案),我的问题是通过这个处理器的良好实现我的意思是一些功能,如果它失败我可以取而代之的是控制并发出错误。我认为 flatMap 将为您提供足够的功能。就我而言,我使用了:

        import org.jsoup.Jsoup;

        Flux.just("url")
            .flatMap(url -> {
                try {
                    Document document = Jsoup.connect(url).get();
                    return Flux.just(document);
                } catch (IOException e) {
                    return Flux.error(e);
                }
            })
            .subscribe();

这真的取决于你想做什么。

Flux 上的大多数方法都创建了这样的处理器,只是 return 它们作为 Flux 确保它们以正确的方式订阅上游 Flux

因此,如果您的 Processor 应该只为它接收到的每个事件发出一个事件,但是不同的事件 map 是您创建 Processor 的简单方法。如果它为接收到的每个事件创建多个(或没有)事件,请使用 flatMap 等等。

您可以通过链接这些方法来创建更复杂的方法。 我希望 99% 的用例都能以这种方式处理。

如果这还不够,请考虑 subscribe 的各种重载,您可以在其中使用 Consumer 来处理 Flux 的元素以及状态更改,例如错误、完成和订阅。您可以将它们与 Flux.create(fluxSink -> ...) 结合起来,以构建非常灵活的 Processors

您可能正在寻找 SubmissionPublisher which seems similar to the Flux 在 reactor 中的实现:

A Flow.Publisher that asynchronously issues submitted (non-null) items to current subscribers until it is closed. Each current subscriber receives newly submitted items in the same order unless drops or exceptions are encountered. Using a SubmissionPublisher allows item generators to act as compliant reactive-streams Publishers relying on drop handling and/or blocking for flow control.

注意:自定义示例Flow.Processor在link中共享,可以进一步自定义为根据用例的需要处理 onErrorconsume 方法实现。

根据您对用例的描述,我不认为您真的需要 Processor。相反,使用 flatMap 触发异步 URL 提取。 flatMap,与所有 Reactive Streams 运算符一样,将默认在出现错误时立即停止。

您可能需要处理器的唯一部分是生成 Flux<URL> 如果您事先不知道 URL(否则, Flux.fromIterableFlux.just(...) 就可以了。

如果您需要在不重新触发请求的情况下将结果分派给多个 Subscriber,请查看 publish().connect() and/or cache()