一组动态的发布者都通过相同的通量发射

Dynamic set of publishers all emiting through the same flux

我正在尝试构建一种可以通过热 Flux(输出)发出的集线器服务,但您也可以 register/unregister Flux producers/publishers(输入)

我知道我可以做类似的事情:

    class Hub<T> {
        /**
         * @return unregister function
         */
        Function<Void, Void> registerProducer(final Flux<T> flux) { ... }

        Disposable subscribe(Consumer<? super T> consumer) {
            if (out == null) { 
                // obviously this will not work!
                out = Flux.merge(producer1, producer2, ...).share();
            }
            return out;
        }
    }

...但是由于这些 "producers" 已注册和未注册,我如何向现有订阅的流量添加新的流量源?或从中删除未注册的来源?

TIA!

Flux 在设计上是不可变的,因此正如您在问题中所暗示的那样,没有办法只 "update" 原位存在的 Flux

通常我建议避免直接使用 Processor。但是,这是 Processor 可能是唯一明智的选择的(罕见的)情况之一,因为您本质上希望根据您注册的生产者动态发布元素。类似于:

class Hub<T> {

    private final FluxProcessor<T, T> processor;
    private final FluxSink<T> sink;

    public Hub() {
        this.processor = DirectProcessor.<T>create().serialize();
        this.sink = processor.sink();
    }

    public Disposable registerProducer(Flux<T> flux) {
        return flux.subscribe(sink::next);
    }

    public Flux<T> read() {
        return processor;
    }
}

如果您想删除生产者,那么您可以跟踪从 registerProducer() 返回的 Disposable 并在完成后调用 dispose()