Project Reactor:我需要处理器吗?

Project Reactor: Do I need a Processor?

我正在尝试在 Reactor 之上设计一个管道框架。

在每个阶段(不考虑第一个和最后一个),我们都有转换对象的任务(即字符串到它的长度或 url 到它的 HTML 内容等)。这是一个例子:

你可以看到中间层有 3 个任务,每个任务将一个 X 对象转换为一个 Y 对象(顺便说一下,它总是一个全连接层)

我的Question/Dilemma: 我的第一个想法是,我只需要 Flux.merge(),然后将其连接到每个订阅者。例如:

Flux<X> source = Flux.merge(x1Flux, x2Flux)  
source.subscribe(y1Subscriber)
source.subscribe(y2Subscriber)

另一种选择是放置一个处理器(TopicProcessor?)作为中间件(就像在发布-订阅模式中一样)

我不了解哪种解决方案最适合我的问题。逻辑上是一样的,但每种架构的实际含义是什么?

谢谢!

我这里的一般方法是使用 ConnectableFlux 来延迟发布,直到您完成整个管道设置,然后在设置完每个通量后调用 connect()管道。

可以使用处理器,但我建议尽可能避免使用。

一般要点(未检查语法)类似于:

ConnectableFlux<String> x1 = Flux.just("x1").publish();
ConnectableFlux<String> x2 = Flux.just("x2").publish();

ConnectableFlux<String> y1 = Flux.<String>from(Flux.merge(x1, x2)).publish();
ConnectableFlux<String> y2 = Flux.<String>from(Flux.merge(x1, x2)).publish();
ConnectableFlux<String> y3 = Flux.<String>from(Flux.merge(x1, x2)).publish();

ConnectableFlux<String> z3 = Flux.<String>from(Flux.merge(y1, y2, y3)).publish();

x1.connect();
x2.connect();
y1.connect();
//...etc.

另请注意,您可能希望使用 concat()mergeSequential() 而不是 merge(),具体取决于您的用例(merge() 会热切地订阅发布者,concat() 不会,mergeSequential() 将按收到的顺序合并,可能会交错这些值。)