spring 反应器项目的反应流背压
Reactive stream backpressure with spring reactor project
我研究和阅读了他们的文件,他们不是很明白。
我想要实现的是以下功能:
我正在使用 Spring Reactor 项目并使用 eventBus。我的事件总线正在向模块 A 抛出事件。
模块 A 应该接收事件并插入到将保存唯一值的热流中。每 250 毫秒,流应该提取所有值并对它们进行计算......等等。
例如:
eventBus 正在抛出编号为:1,2,3,2,3,2
的事件
流应该获取并保存唯一值 -> 1,2,3
250 毫秒后,流应打印数字和空值
有人知道如何开始吗?我尝试了这些例子,但没有任何效果,我想我不明白什么。有人有例子吗?
Tnx
编辑:
尝试执行下一步时,我总是遇到异常:
Stream<List<Integer>> s = Streams.wrap(p).buffer(1, TimeUnit.SECONDS);
s.consume(i -> System.out.println(Thread.currentThread() + " data=" + i));
for (int i = 0; i < 10000; i++) {
p.onNext(i);
}
异常:
java.lang.IllegalStateException: The environment has not been initialized yet
at reactor.Environment.get(Environment.java:156) ~[reactor-core-2.0.7.RELEASE.jar:?]
at reactor.Environment.timer(Environment.java:184) ~[reactor-core-2.0.7.RELEASE.jar:?]
at reactor.rx.Stream.getTimer(Stream.java:3052) ~[reactor-stream-2.0.7.RELEASE.jar:?]
at reactor.rx.Stream.buffer(Stream.java:2246) ~[reactor-stream-2.0.7.RELEASE.jar:?]
at com.ta.ng.server.controllers.user.UserController.getUsersByOrgId(UserController.java:70) ~[classes/:?]
如您所见,如果不通过此问题,我将无法继续尝试。
顺便说一句:这仅在我使用 buffer(1, TimeUnit.SECONDS)
时发生 如果我使用 buffer(50)
例如它有效.. 虽然这不是最终解决方案,但它是一个开始。
好吧,在再次阅读文档后,我错过了这个:
static {
Environment.initialize();
}
这解决了问题。发送
我研究和阅读了他们的文件,他们不是很明白。 我想要实现的是以下功能:
我正在使用 Spring Reactor 项目并使用 eventBus。我的事件总线正在向模块 A 抛出事件。
模块 A 应该接收事件并插入到将保存唯一值的热流中。每 250 毫秒,流应该提取所有值并对它们进行计算......等等。
例如: eventBus 正在抛出编号为:1,2,3,2,3,2
的事件流应该获取并保存唯一值 -> 1,2,3 250 毫秒后,流应打印数字和空值
有人知道如何开始吗?我尝试了这些例子,但没有任何效果,我想我不明白什么。有人有例子吗?
Tnx
编辑:
尝试执行下一步时,我总是遇到异常:
Stream<List<Integer>> s = Streams.wrap(p).buffer(1, TimeUnit.SECONDS);
s.consume(i -> System.out.println(Thread.currentThread() + " data=" + i));
for (int i = 0; i < 10000; i++) {
p.onNext(i);
}
异常:
java.lang.IllegalStateException: The environment has not been initialized yet
at reactor.Environment.get(Environment.java:156) ~[reactor-core-2.0.7.RELEASE.jar:?]
at reactor.Environment.timer(Environment.java:184) ~[reactor-core-2.0.7.RELEASE.jar:?]
at reactor.rx.Stream.getTimer(Stream.java:3052) ~[reactor-stream-2.0.7.RELEASE.jar:?]
at reactor.rx.Stream.buffer(Stream.java:2246) ~[reactor-stream-2.0.7.RELEASE.jar:?]
at com.ta.ng.server.controllers.user.UserController.getUsersByOrgId(UserController.java:70) ~[classes/:?]
如您所见,如果不通过此问题,我将无法继续尝试。
顺便说一句:这仅在我使用 buffer(1, TimeUnit.SECONDS)
时发生 如果我使用 buffer(50)
例如它有效.. 虽然这不是最终解决方案,但它是一个开始。
好吧,在再次阅读文档后,我错过了这个:
static {
Environment.initialize();
}
这解决了问题。发送