Java 通量与 Observable/BehaviorSubject

Java Flux vs. Observable/BehaviorSubject

我的问题是 Flux 是否能够表现得像 Observable 或 BehaviorSubject。我想我明白了 Flux 的作用和方式,但是我看到的每个教程都会创建一个静态内容的 Flux,即一些预先存在的数字数组,这些数组本质上是有限的。

但是,我希望我的 Flux 成为随时间变化的未知值流……就像 Observable 或 BehaviorSubject。有了这些,您可以创建一个方法,如 setNextValue(String value),并将这些值泵送到 Observable/BehaviorSubject 等

的所有订阅者

使用 Flux 可以吗?还是 Flux 必须先由 Observable 类型的值流组成?

更新

我用下面的实现回答了我自己的问题。接受的答案可能会导致相同的路径,但稍微复杂。

every tutorial I see creates a Flux of static content, i.e. some pre-existing array of numbers which are finite in nature.

你会看到这个,因为大多数教程都关注如何操作和使用 Flux - 但这里的含义(你可以只使用带有静态、固定长度内容的 Flux ) 既不幸又错误。它比那更强大,并且将它与此类静态内容一起使用几乎可以肯定不是您在现实世界中看到它的使用方式。

基本上有 3 种不同的方法来实例化 Flux 以动态发射元素,如您所述:

However, I want my Flux to be a stream of unknown values over time... like an Observable or BehaviorSubject. With those, you can create a method like setNextValue(String value), and pump those values to all subscribers of the Observable/BehaviorSubject etc.

绝对 - 看看 Flux.push(). This exposes an emitter, and emitter.next(value) can be called whenever you wish. This stream can go on for as long as you want it to (infinitely, if desired.) Flux.create() 本质上是 Flux.push() 的多线程变体,它也可能有用。

Flux.generate() 可能也值得一看 - 这有点像 Flux.push() 的 "on-demand" 版本,您只在下游消费者时通过回调发出下一个元素请求它,而不是随时发出。这并不总是实用的,但如果用例使其可行,则使用此方法是有意义的,因为它尊重背压,因此可以保证不会用超出其处理能力的更多请求压倒消费者。

可以这样实现:

private EmitterProcessor<String> processor;
private FluxSink<String> statusSink;
private Flux<String> status;

public constructor() {
    this.processor = EmitterProcessor.create();
    this.statusSink = this.processor.sink(FluxSink.OverflowStrategy.BUFFER);
    this.status = this.processor.publish().autoConnect();
}

public Flux<String> getStatus() {
    return this.status;
}

public void setStatus(String status) {
    this.statusSink.next(status);
}