好的 implementation/support java.util.concurrent.Flow.Processor<T,R>
Good implementation/support for java.util.concurrent.Flow.Processor<T,R>
最近,我发现 projectreactor.io 对 Publisher 有很好的支持:
Flux.create(fluxSink -> {
for (int i = 0; i < 10; i++)
fluxSink.next(i);
fluxSink.complete();
})
.map(...)
.subscribe(...);
Proccessor有什么好的支持吗?
我的意思是类似或类似的东西:
XXX process = new XXX((inputValue, output) -> {
if(inputValue == 0)
output.error();
else
output.next(inputValue);
});
publisher.subscribe(process);
process.subscribe(...);
如果没有,我该如何实现我自己的或者为什么我不能这样做?
更新 1:
经过讨论(见评论)看来在我的用例中我需要使用 flatMap
(见答案),我的问题是通过这个处理器的良好实现我的意思是一些功能,如果它失败我可以取而代之的是控制并发出错误。我认为 flatMap
将为您提供足够的功能。就我而言,我使用了:
import org.jsoup.Jsoup;
Flux.just("url")
.flatMap(url -> {
try {
Document document = Jsoup.connect(url).get();
return Flux.just(document);
} catch (IOException e) {
return Flux.error(e);
}
})
.subscribe();
这真的取决于你想做什么。
Flux
上的大多数方法都创建了这样的处理器,只是 return 它们作为 Flux
确保它们以正确的方式订阅上游 Flux
。
因此,如果您的 Processor
应该只为它接收到的每个事件发出一个事件,但是不同的事件 map
是您创建 Processor
的简单方法。如果它为接收到的每个事件创建多个(或没有)事件,请使用 flatMap
等等。
您可以通过链接这些方法来创建更复杂的方法。
我希望 99% 的用例都能以这种方式处理。
如果这还不够,请考虑 subscribe
的各种重载,您可以在其中使用 Consumer
来处理 Flux
的元素以及状态更改,例如错误、完成和订阅。您可以将它们与 Flux.create(fluxSink -> ...)
结合起来,以构建非常灵活的 Processors
。
您可能正在寻找 SubmissionPublisher
which seems similar to the Flux
在 reactor 中的实现:
A Flow.Publisher
that asynchronously issues submitted (non-null)
items to current subscribers until it is closed. Each current
subscriber receives newly submitted items in the same order unless
drops or exceptions are encountered. Using a SubmissionPublisher
allows item generators to act as compliant reactive-streams Publishers
relying on drop handling and/or blocking for flow control.
注意:自定义示例Flow.Processor
在link中共享,可以进一步自定义为根据用例的需要处理 onError
和 consume
方法实现。
根据您对用例的描述,我不认为您真的需要 Processor
。相反,使用 flatMap
触发异步 URL 提取。 flatMap
,与所有 Reactive Streams 运算符一样,将默认在出现错误时立即停止。
您可能需要处理器的唯一部分是生成 Flux<URL>
如果您事先不知道 URL(否则, Flux.fromIterable
或 Flux.just(...)
就可以了。
如果您需要在不重新触发请求的情况下将结果分派给多个 Subscriber
,请查看 publish().connect()
and/or cache()
。
最近,我发现 projectreactor.io 对 Publisher 有很好的支持:
Flux.create(fluxSink -> {
for (int i = 0; i < 10; i++)
fluxSink.next(i);
fluxSink.complete();
})
.map(...)
.subscribe(...);
Proccessor有什么好的支持吗? 我的意思是类似或类似的东西:
XXX process = new XXX((inputValue, output) -> {
if(inputValue == 0)
output.error();
else
output.next(inputValue);
});
publisher.subscribe(process);
process.subscribe(...);
如果没有,我该如何实现我自己的或者为什么我不能这样做?
更新 1:
经过讨论(见评论)看来在我的用例中我需要使用 flatMap
(见答案),我的问题是通过这个处理器的良好实现我的意思是一些功能,如果它失败我可以取而代之的是控制并发出错误。我认为 flatMap
将为您提供足够的功能。就我而言,我使用了:
import org.jsoup.Jsoup;
Flux.just("url")
.flatMap(url -> {
try {
Document document = Jsoup.connect(url).get();
return Flux.just(document);
} catch (IOException e) {
return Flux.error(e);
}
})
.subscribe();
这真的取决于你想做什么。
Flux
上的大多数方法都创建了这样的处理器,只是 return 它们作为 Flux
确保它们以正确的方式订阅上游 Flux
。
因此,如果您的 Processor
应该只为它接收到的每个事件发出一个事件,但是不同的事件 map
是您创建 Processor
的简单方法。如果它为接收到的每个事件创建多个(或没有)事件,请使用 flatMap
等等。
您可以通过链接这些方法来创建更复杂的方法。 我希望 99% 的用例都能以这种方式处理。
如果这还不够,请考虑 subscribe
的各种重载,您可以在其中使用 Consumer
来处理 Flux
的元素以及状态更改,例如错误、完成和订阅。您可以将它们与 Flux.create(fluxSink -> ...)
结合起来,以构建非常灵活的 Processors
。
您可能正在寻找 SubmissionPublisher
which seems similar to the Flux
在 reactor 中的实现:
A
Flow.Publisher
that asynchronously issues submitted (non-null) items to current subscribers until it is closed. Each current subscriber receives newly submitted items in the same order unless drops or exceptions are encountered. Using aSubmissionPublisher
allows item generators to act as compliant reactive-streams Publishers relying on drop handling and/or blocking for flow control.
注意:自定义示例Flow.Processor
在link中共享,可以进一步自定义为根据用例的需要处理 onError
和 consume
方法实现。
根据您对用例的描述,我不认为您真的需要 Processor
。相反,使用 flatMap
触发异步 URL 提取。 flatMap
,与所有 Reactive Streams 运算符一样,将默认在出现错误时立即停止。
您可能需要处理器的唯一部分是生成 Flux<URL>
如果您事先不知道 URL(否则, Flux.fromIterable
或 Flux.just(...)
就可以了。
如果您需要在不重新触发请求的情况下将结果分派给多个 Subscriber
,请查看 publish().connect()
and/or cache()
。