一组动态的发布者都通过相同的通量发射
Dynamic set of publishers all emiting through the same flux
我正在尝试构建一种可以通过热 Flux(输出)发出的集线器服务,但您也可以 register/unregister Flux producers/publishers(输入)
我知道我可以做类似的事情:
class Hub<T> {
/**
* @return unregister function
*/
Function<Void, Void> registerProducer(final Flux<T> flux) { ... }
Disposable subscribe(Consumer<? super T> consumer) {
if (out == null) {
// obviously this will not work!
out = Flux.merge(producer1, producer2, ...).share();
}
return out;
}
}
...但是由于这些 "producers" 已注册和未注册,我如何向现有订阅的流量添加新的流量源?或从中删除未注册的来源?
TIA!
Flux
在设计上是不可变的,因此正如您在问题中所暗示的那样,没有办法只 "update" 原位存在的 Flux
。
通常我建议避免直接使用 Processor
。但是,这是 Processor
可能是唯一明智的选择的(罕见的)情况之一,因为您本质上希望根据您注册的生产者动态发布元素。类似于:
class Hub<T> {
private final FluxProcessor<T, T> processor;
private final FluxSink<T> sink;
public Hub() {
this.processor = DirectProcessor.<T>create().serialize();
this.sink = processor.sink();
}
public Disposable registerProducer(Flux<T> flux) {
return flux.subscribe(sink::next);
}
public Flux<T> read() {
return processor;
}
}
如果您想删除生产者,那么您可以跟踪从 registerProducer()
返回的 Disposable
并在完成后调用 dispose()
。
我正在尝试构建一种可以通过热 Flux(输出)发出的集线器服务,但您也可以 register/unregister Flux producers/publishers(输入)
我知道我可以做类似的事情:
class Hub<T> {
/**
* @return unregister function
*/
Function<Void, Void> registerProducer(final Flux<T> flux) { ... }
Disposable subscribe(Consumer<? super T> consumer) {
if (out == null) {
// obviously this will not work!
out = Flux.merge(producer1, producer2, ...).share();
}
return out;
}
}
...但是由于这些 "producers" 已注册和未注册,我如何向现有订阅的流量添加新的流量源?或从中删除未注册的来源?
TIA!
Flux
在设计上是不可变的,因此正如您在问题中所暗示的那样,没有办法只 "update" 原位存在的 Flux
。
通常我建议避免直接使用 Processor
。但是,这是 Processor
可能是唯一明智的选择的(罕见的)情况之一,因为您本质上希望根据您注册的生产者动态发布元素。类似于:
class Hub<T> {
private final FluxProcessor<T, T> processor;
private final FluxSink<T> sink;
public Hub() {
this.processor = DirectProcessor.<T>create().serialize();
this.sink = processor.sink();
}
public Disposable registerProducer(Flux<T> flux) {
return flux.subscribe(sink::next);
}
public Flux<T> read() {
return processor;
}
}
如果您想删除生产者,那么您可以跟踪从 registerProducer()
返回的 Disposable
并在完成后调用 dispose()
。