Project Reactor:我需要处理器吗?
Project Reactor: Do I need a Processor?
我正在尝试在 Reactor 之上设计一个管道框架。
在每个阶段(不考虑第一个和最后一个),我们都有转换对象的任务(即字符串到它的长度或 url 到它的 HTML 内容等)。这是一个例子:
你可以看到中间层有 3 个任务,每个任务将一个 X 对象转换为一个 Y 对象(顺便说一下,它总是一个全连接层)
我的Question/Dilemma:
我的第一个想法是,我只需要 Flux.merge()
,然后将其连接到每个订阅者。例如:
Flux<X> source = Flux.merge(x1Flux, x2Flux)
source.subscribe(y1Subscriber)
source.subscribe(y2Subscriber)
另一种选择是放置一个处理器(TopicProcessor?)作为中间件(就像在发布-订阅模式中一样)
我不了解哪种解决方案最适合我的问题。逻辑上是一样的,但每种架构的实际含义是什么?
谢谢!
我这里的一般方法是使用 ConnectableFlux
来延迟发布,直到您完成整个管道设置,然后在设置完每个通量后调用 connect()
管道。
您可以使用处理器,但我建议尽可能避免使用。
一般要点(未检查语法)类似于:
ConnectableFlux<String> x1 = Flux.just("x1").publish();
ConnectableFlux<String> x2 = Flux.just("x2").publish();
ConnectableFlux<String> y1 = Flux.<String>from(Flux.merge(x1, x2)).publish();
ConnectableFlux<String> y2 = Flux.<String>from(Flux.merge(x1, x2)).publish();
ConnectableFlux<String> y3 = Flux.<String>from(Flux.merge(x1, x2)).publish();
ConnectableFlux<String> z3 = Flux.<String>from(Flux.merge(y1, y2, y3)).publish();
x1.connect();
x2.connect();
y1.connect();
//...etc.
另请注意,您可能希望使用 concat()
或 mergeSequential()
而不是 merge()
,具体取决于您的用例(merge()
会热切地订阅发布者,concat()
不会,mergeSequential()
将按收到的顺序合并,可能会交错这些值。)
我正在尝试在 Reactor 之上设计一个管道框架。
在每个阶段(不考虑第一个和最后一个),我们都有转换对象的任务(即字符串到它的长度或 url 到它的 HTML 内容等)。这是一个例子:
你可以看到中间层有 3 个任务,每个任务将一个 X 对象转换为一个 Y 对象(顺便说一下,它总是一个全连接层)
我的Question/Dilemma:
我的第一个想法是,我只需要 Flux.merge()
,然后将其连接到每个订阅者。例如:
Flux<X> source = Flux.merge(x1Flux, x2Flux)
source.subscribe(y1Subscriber)
source.subscribe(y2Subscriber)
另一种选择是放置一个处理器(TopicProcessor?)作为中间件(就像在发布-订阅模式中一样)
我不了解哪种解决方案最适合我的问题。逻辑上是一样的,但每种架构的实际含义是什么?
谢谢!
我这里的一般方法是使用 ConnectableFlux
来延迟发布,直到您完成整个管道设置,然后在设置完每个通量后调用 connect()
管道。
您可以使用处理器,但我建议尽可能避免使用。
一般要点(未检查语法)类似于:
ConnectableFlux<String> x1 = Flux.just("x1").publish();
ConnectableFlux<String> x2 = Flux.just("x2").publish();
ConnectableFlux<String> y1 = Flux.<String>from(Flux.merge(x1, x2)).publish();
ConnectableFlux<String> y2 = Flux.<String>from(Flux.merge(x1, x2)).publish();
ConnectableFlux<String> y3 = Flux.<String>from(Flux.merge(x1, x2)).publish();
ConnectableFlux<String> z3 = Flux.<String>from(Flux.merge(y1, y2, y3)).publish();
x1.connect();
x2.connect();
y1.connect();
//...etc.
另请注意,您可能希望使用 concat()
或 mergeSequential()
而不是 merge()
,具体取决于您的用例(merge()
会热切地订阅发布者,concat()
不会,mergeSequential()
将按收到的顺序合并,可能会交错这些值。)