如何向 ConnectableFlowable 发送取消信号?

How to send cancel signal to ConnectableFlowable?

我使用一次性 Flowable 来发送和订阅项目。但是当我尝试使用 ConnectableFlowable 时,我无法向发射器发送取消信号。我如何理解 flowable 是在 Flowable.create 方法中设置的?

您可以通过注释和取消注释 'publish().autoConnect()' 代码片段来查看场景。

Disposable disposable = Flowable.create(emitter -> {
        AtomicBoolean isRunning = new AtomicBoolean(true);
        AtomicInteger i = new AtomicInteger();
        new Thread(() -> {
            while (isRunning.get()) {
                i.getAndIncrement();
                System.out.println("Emitting:" + i.get());
                emitter.onNext(i.get());
                try {
                    Thread.sleep(1_000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        emitter.setCancellable(() -> {
            System.out.println("Cancelled");
            isRunning.set(false);
        });
    }, BackpressureStrategy.BUFFER)
            .publish() //comment here
            .autoConnect() //and here
            .subscribe(s -> {
                System.out.println("Subscribed:" + s);
            });

    Thread.sleep(10_000);
    disposable.dispose();
    Thread.sleep(100_000);

有一个 overload 可以让您访问 Disposable 以取消连接:

SerialDisposable sd = new SerialDisposable();

source.publish().autoConnect(1, sd::set);

// ...

sd.dispose();