将 io.projectreactor 版本从 2.0.x 升级到 3.0.4 - 使用 Spring 框架
Upgrading io.projectreactor version from 2.0.x to 3.0.4 - Using Spring framework
我在尝试升级时遇到问题。
目前我使用的是版本 2.0.x,尤其是 -
reactor.bus
reactor.rx.Stream
reactor.rx.Streams
reactor.core.processor.RingBufferProcessor
reactor.fn.Consumer
我正在使用 maven,并且我有一个关于 'projectreactor' -
的依赖项
<groupId>io.projectreactor.spring</groupId>
<artifactId>reactor-spring-core</artifactId>
当升级到3.0.4.RELEASE版本时,为了继续使用我以前用过的所有东西,我需要显式导入-
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bus</artifactId>
和
<groupId>io.projectreactor</groupId>
<artifactId>reactor-stream</artifactId>
但我还是不见了
reactor.core.processor.RingBufferProcessor
reactor.fn.Consumer
我不知道该怎么做。
reactor.fn.Consumer
替换为 Java 8 java.util.function.Consumer
.
至于 RingBufferProcessor
你必须选择一个 new processors 全部使用环形缓冲区。
Dispatcher
现在是 Schedulers,在后台使用 Java 的 Executor
。
reactor.rx.Stream -> reactor.core.publisher.Flux
reactor.rx.Streams -> reactor.core.publisher.Flux
reactor.rx.Promise -> reactor.core.publisher.Mono and reactor.core.publisher.MonoProcessor
reactor.core.processor.RingBufferProcessor -> reactor.core.publisher.TopicProcessor
reactor.fn.Consumer -> java.until.function.Consumer (Java 8)
没有新的 spring 模块,因为 spring 5 直接包含对这些新类型的 Reactor 支持。
至于 reactor-bus :
根据设计,现在所有流路由(Flux/Mono 链)都是类型化的,因此动态路由还不是我们功能的一部分。仍然有其他类型的选择,例如:
ReplayProcessor<MyEvent> rp = ReplayProcessor.create();
Flux<MyEvent> interest1 = rp.filter(ev -> filterInterest1(ev));
Flux<MyEvent> interest2 = rp.filter(ev -> filterInterest2(ev));
Flux<MyEvent> interest1_2 = rp.filter(ev -> filterInterest1(ev) || filterInterest2(ev));
interest1.subscribe(doSomethingForInterest1);
interest2.subscribe(doSomethingForInterest2);
interest1_2.subscribe(doSomethingForInterest1_2);
rp.onNext(new MyEvent("interest1")); //subscriber 1 and 3 react
rp.onNext(new MyEvent("interest1")); //subscriber 1 and 3 react
rp.onNext(new MyEvent("interest2")); //subscriber 2 and 3 react
rp.onNext(new MyEvent("interest4")); // buffered until interest subscriber because ReplayProcessor
//shutdown/cleanup/close
rp.onComplete();
我在 github 上找到了这个,它似乎符合您的需要
我在尝试升级时遇到问题。
目前我使用的是版本 2.0.x,尤其是 -
reactor.bus
reactor.rx.Stream
reactor.rx.Streams
reactor.core.processor.RingBufferProcessor
reactor.fn.Consumer
我正在使用 maven,并且我有一个关于 'projectreactor' -
的依赖项<groupId>io.projectreactor.spring</groupId>
<artifactId>reactor-spring-core</artifactId>
当升级到3.0.4.RELEASE版本时,为了继续使用我以前用过的所有东西,我需要显式导入-
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bus</artifactId>
和
<groupId>io.projectreactor</groupId>
<artifactId>reactor-stream</artifactId>
但我还是不见了
reactor.core.processor.RingBufferProcessor
reactor.fn.Consumer
我不知道该怎么做。
reactor.fn.Consumer
替换为 Java 8 java.util.function.Consumer
.
至于 RingBufferProcessor
你必须选择一个 new processors 全部使用环形缓冲区。
Dispatcher
现在是 Schedulers,在后台使用 Java 的 Executor
。
reactor.rx.Stream -> reactor.core.publisher.Flux
reactor.rx.Streams -> reactor.core.publisher.Flux
reactor.rx.Promise -> reactor.core.publisher.Mono and reactor.core.publisher.MonoProcessor
reactor.core.processor.RingBufferProcessor -> reactor.core.publisher.TopicProcessor
reactor.fn.Consumer -> java.until.function.Consumer (Java 8)
没有新的 spring 模块,因为 spring 5 直接包含对这些新类型的 Reactor 支持。
至于 reactor-bus : 根据设计,现在所有流路由(Flux/Mono 链)都是类型化的,因此动态路由还不是我们功能的一部分。仍然有其他类型的选择,例如:
ReplayProcessor<MyEvent> rp = ReplayProcessor.create();
Flux<MyEvent> interest1 = rp.filter(ev -> filterInterest1(ev));
Flux<MyEvent> interest2 = rp.filter(ev -> filterInterest2(ev));
Flux<MyEvent> interest1_2 = rp.filter(ev -> filterInterest1(ev) || filterInterest2(ev));
interest1.subscribe(doSomethingForInterest1);
interest2.subscribe(doSomethingForInterest2);
interest1_2.subscribe(doSomethingForInterest1_2);
rp.onNext(new MyEvent("interest1")); //subscriber 1 and 3 react
rp.onNext(new MyEvent("interest1")); //subscriber 1 and 3 react
rp.onNext(new MyEvent("interest2")); //subscriber 2 and 3 react
rp.onNext(new MyEvent("interest4")); // buffered until interest subscriber because ReplayProcessor
//shutdown/cleanup/close
rp.onComplete();
我在 github 上找到了这个,它似乎符合您的需要